Merge lp:~brianaker/gearmand/finish into lp:gearmand

Proposed by Brian Aker
Status: Merged
Merged at revision: 799
Proposed branch: lp:~brianaker/gearmand/finish
Merge into: lp:gearmand
Diff against target: 708 lines (+277/-214)
6 files modified
libgearman/error.hpp (+1/-0)
libgearman/function/function_v1.hpp (+1/-1)
libgearman/interface/worker.hpp (+5/-0)
libgearman/job.cc (+241/-211)
libgearman/job.hpp (+23/-1)
libgearman/worker.cc (+6/-1)
To merge this branch: bzr merge lp:~brianaker/gearmand/finish
Reviewer Review Type Date Requested Status
Tangent Trunk Pending
Review via email: mp+172716@code.launchpad.net
To post a comment you must log in.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'libgearman/error.hpp'
--- libgearman/error.hpp 2013-02-04 06:00:39 +0000
+++ libgearman/error.hpp 2013-07-03 01:51:27 +0000
@@ -63,3 +63,4 @@
63 const char *position);63 const char *position);
6464
65void gearman_worker_reset_error(gearman_worker_st *worker);65void gearman_worker_reset_error(gearman_worker_st *worker);
66void gearman_worker_reset_error(Worker&);
6667
=== modified file 'libgearman/function/function_v1.hpp'
--- libgearman/function/function_v1.hpp 2012-08-20 17:07:47 +0000
+++ libgearman/function/function_v1.hpp 2013-07-03 01:51:27 +0000
@@ -63,7 +63,7 @@
63 }63 }
6464
65 job->error_code= GEARMAN_SUCCESS;65 job->error_code= GEARMAN_SUCCESS;
66 job->worker->impl()->work_result= _worker_fn(job, context_arg, &(job->worker->impl()->work_result_size), &job->error_code);66 job->_worker.work_result= _worker_fn(job, context_arg, &(job->_worker.work_result_size), &job->error_code);
6767
68 if (job->error_code == GEARMAN_LOST_CONNECTION)68 if (job->error_code == GEARMAN_LOST_CONNECTION)
69 {69 {
7070
=== modified file 'libgearman/interface/worker.hpp'
--- libgearman/interface/worker.hpp 2013-06-05 21:59:31 +0000
+++ libgearman/interface/worker.hpp 2013-07-03 01:51:27 +0000
@@ -127,6 +127,11 @@
127 universal.options._ssl= ssl_;127 universal.options._ssl= ssl_;
128 }128 }
129129
130 const char* error() const
131 {
132 return universal.error();
133 }
134
130private:135private:
131 gearman_worker_st* _shell;136 gearman_worker_st* _shell;
132 gearman_worker_st _owned_shell;137 gearman_worker_st _owned_shell;
133138
=== modified file 'libgearman/job.cc'
--- libgearman/job.cc 2013-06-15 07:55:07 +0000
+++ libgearman/job.cc 2013-07-03 01:51:27 +0000
@@ -170,42 +170,46 @@
170170
171gearman_job_st *gearman_job_create(gearman_worker_st *worker, gearman_job_st *job)171gearman_job_st *gearman_job_create(gearman_worker_st *worker, gearman_job_st *job)
172{172{
173 if (job)173 if (worker)
174 {174 {
175 job->options.allocated= false;175 if (job)
176 }176 {
177 else177 job->options.allocated= false;
178 {178 }
179 job= new (std::nothrow) gearman_job_st;179 else
180 if (job == NULL)180 {
181 {181 job= new (std::nothrow) gearman_job_st(*(worker->impl()));
182 gearman_perror(worker->impl()->universal, "new");182 if (job == NULL)
183 return NULL;183 {
184 }184 gearman_perror(job->_worker.universal, "new");
185185 return NULL;
186 job->options.allocated= true;186 }
187 }187
188188 job->options.allocated= true;
189 job->options.assigned_in_use= false;189 }
190 job->options.work_in_use= false;190
191 job->options.finished= false;191 job->options.assigned_in_use= false;
192192 job->options.work_in_use= false;
193 job->worker= worker;193 job->options.finished= false;
194 job->reducer= NULL;194
195 job->error_code= GEARMAN_UNKNOWN_STATE;195 job->reducer= NULL;
196196 job->error_code= GEARMAN_UNKNOWN_STATE;
197 if (worker->impl()->job_list)197
198 {198 if (job->_worker.job_list)
199 worker->impl()->job_list->prev= job;199 {
200 }200 job->_worker.job_list->prev= job;
201 job->next= worker->impl()->job_list;201 }
202 job->prev= NULL;202 job->next= job->_worker.job_list;
203 worker->impl()->job_list= job;203 job->prev= NULL;
204 worker->impl()->job_count++;204 job->_worker.job_list= job;
205205 job->_worker.job_count++;
206 job->con= NULL;206
207207 job->con= NULL;
208 return job;208
209 return job;
210 }
211
212 return NULL;
209}213}
210214
211bool gearman_job_build_reducer(gearman_job_st *job, gearman_aggregator_fn *aggregator_fn)215bool gearman_job_build_reducer(gearman_job_st *job, gearman_aggregator_fn *aggregator_fn)
@@ -217,7 +221,7 @@
217221
218 gearman_string_t reducer_func= gearman_job_reducer_string(job);222 gearman_string_t reducer_func= gearman_job_reducer_string(job);
219223
220 job->reducer= new (std::nothrow) gearman_job_reducer_st(job->worker->impl()->universal, reducer_func, aggregator_fn);224 job->reducer= new (std::nothrow) gearman_job_reducer_st(job->universal(), reducer_func, aggregator_fn);
221 if (not job->reducer)225 if (not job->reducer)
222 {226 {
223 gearman_job_free(job);227 gearman_job_free(job);
@@ -235,9 +239,9 @@
235239
236static inline void gearman_job_reset_error(gearman_job_st *job)240static inline void gearman_job_reset_error(gearman_job_st *job)
237{241{
238 if (job->worker)242 if (job)
239 {243 {
240 gearman_worker_reset_error(job->worker);244 gearman_worker_reset_error(job->_worker);
241 }245 }
242}246}
243247
@@ -245,36 +249,41 @@
245{249{
246 if (job)250 if (job)
247 {251 {
248 const void *args[2];252 if (job->finished() == false)
249 size_t args_size[2];253 {
250254 const void *args[2];
251 if (job->reducer)255 size_t args_size[2];
252 {256
253 gearman_argument_t value= gearman_argument_make(NULL, 0, static_cast<const char *>(data), data_size);257 if (job->reducer)
254 job->reducer->add(value);258 {
255259 gearman_argument_t value= gearman_argument_make(NULL, 0, static_cast<const char *>(data), data_size);
256 return GEARMAN_SUCCESS;260 job->reducer->add(value);
257 }261
258262 return GEARMAN_SUCCESS;
259 if ((job->options.work_in_use) == false)263 }
260 {264
261 args[0]= job->assigned.arg[0];265 if ((job->options.work_in_use) == false)
262 args_size[0]= job->assigned.arg_size[0];266 {
263 args[1]= data;267 args[0]= job->assigned.arg[0];
264 args_size[1]= data_size;268 args_size[0]= job->assigned.arg_size[0];
265 gearman_return_t ret= gearman_packet_create_args(job->worker->impl()->universal, job->work,269 args[1]= data;
266 GEARMAN_MAGIC_REQUEST,270 args_size[1]= data_size;
267 GEARMAN_COMMAND_WORK_DATA,271 gearman_return_t ret= gearman_packet_create_args(job->universal(), job->work,
268 args, args_size, 2);272 GEARMAN_MAGIC_REQUEST,
269 if (gearman_failed(ret))273 GEARMAN_COMMAND_WORK_DATA,
270 {274 args, args_size, 2);
271 return ret;275 if (gearman_failed(ret))
272 }276 {
273277 return ret;
274 job->options.work_in_use= true;278 }
275 }279
276280 job->options.work_in_use= true;
277 return _job_send(job);281 }
282
283 return _job_send(job);
284 }
285
286 return GEARMAN_SUCCESS;
278 }287 }
279288
280 return GEARMAN_INVALID_ARGUMENT;289 return GEARMAN_INVALID_ARGUMENT;
@@ -286,30 +295,35 @@
286{295{
287 if (job)296 if (job)
288 {297 {
289 const void *args[2];298 if (job->finished() == false)
290 size_t args_size[2];
291
292 if ((job->options.work_in_use) == false)
293 {299 {
294 args[0]= job->assigned.arg[0];300 const void *args[2];
295 args_size[0]= job->assigned.arg_size[0];301 size_t args_size[2];
296 args[1]= warning;
297 args_size[1]= warning_size;
298302
299 gearman_return_t ret;303 if ((job->options.work_in_use) == false)
300 ret= gearman_packet_create_args(job->worker->impl()->universal, job->work,
301 GEARMAN_MAGIC_REQUEST,
302 GEARMAN_COMMAND_WORK_WARNING,
303 args, args_size, 2);
304 if (gearman_failed(ret))
305 {304 {
306 return ret;305 args[0]= job->assigned.arg[0];
306 args_size[0]= job->assigned.arg_size[0];
307 args[1]= warning;
308 args_size[1]= warning_size;
309
310 gearman_return_t ret;
311 ret= gearman_packet_create_args(job->universal(), job->work,
312 GEARMAN_MAGIC_REQUEST,
313 GEARMAN_COMMAND_WORK_WARNING,
314 args, args_size, 2);
315 if (gearman_failed(ret))
316 {
317 return ret;
318 }
319
320 job->options.work_in_use= true;
307 }321 }
308322
309 job->options.work_in_use= true;323 return _job_send(job);
310 }324 }
311325
312 return _job_send(job);326 return GEARMAN_SUCCESS;
313 }327 }
314328
315 return GEARMAN_INVALID_ARGUMENT;329 return GEARMAN_INVALID_ARGUMENT;
@@ -321,37 +335,42 @@
321{335{
322 if (job)336 if (job)
323 {337 {
324 char numerator_string[12];338 if (job->finished() == false)
325 char denominator_string[12];
326 const void *args[3];
327 size_t args_size[3];
328
329 if (not (job->options.work_in_use))
330 {339 {
331 snprintf(numerator_string, 12, "%u", numerator);340 char numerator_string[12];
332 snprintf(denominator_string, 12, "%u", denominator);341 char denominator_string[12];
333342 const void *args[3];
334 args[0]= job->assigned.arg[0];343 size_t args_size[3];
335 args_size[0]= job->assigned.arg_size[0];344
336 args[1]= numerator_string;345 if (not (job->options.work_in_use))
337 args_size[1]= strlen(numerator_string) + 1;
338 args[2]= denominator_string;
339 args_size[2]= strlen(denominator_string);
340
341 gearman_return_t ret;
342 ret= gearman_packet_create_args(job->worker->impl()->universal, job->work,
343 GEARMAN_MAGIC_REQUEST,
344 GEARMAN_COMMAND_WORK_STATUS,
345 args, args_size, 3);
346 if (gearman_failed(ret))
347 {346 {
348 return ret;347 snprintf(numerator_string, 12, "%u", numerator);
348 snprintf(denominator_string, 12, "%u", denominator);
349
350 args[0]= job->assigned.arg[0];
351 args_size[0]= job->assigned.arg_size[0];
352 args[1]= numerator_string;
353 args_size[1]= strlen(numerator_string) + 1;
354 args[2]= denominator_string;
355 args_size[2]= strlen(denominator_string);
356
357 gearman_return_t ret;
358 ret= gearman_packet_create_args(job->universal(), job->work,
359 GEARMAN_MAGIC_REQUEST,
360 GEARMAN_COMMAND_WORK_STATUS,
361 args, args_size, 3);
362 if (gearman_failed(ret))
363 {
364 return ret;
365 }
366
367 job->options.work_in_use= true;
349 }368 }
350369
351 job->options.work_in_use= true;370 return _job_send(job);
352 }371 }
353372
354 return _job_send(job);373 return GEARMAN_SUCCESS;
355 }374 }
356375
357 return GEARMAN_INVALID_ARGUMENT;376 return GEARMAN_INVALID_ARGUMENT;
@@ -363,12 +382,17 @@
363{382{
364 if (job)383 if (job)
365 {384 {
366 if (job->reducer)385 if (job->finished() == false)
367 {386 {
368 return GEARMAN_INVALID_ARGUMENT;387 if (job->reducer)
388 {
389 return GEARMAN_INVALID_ARGUMENT;
390 }
391
392 return gearman_job_send_complete_fin(job, result, result_size);
369 }393 }
370394
371 return gearman_job_send_complete_fin(job, result, result_size);395 return GEARMAN_SUCCESS;
372 }396 }
373397
374 return GEARMAN_INVALID_ARGUMENT;398 return GEARMAN_INVALID_ARGUMENT;
@@ -379,66 +403,64 @@
379{403{
380 if (job)404 if (job)
381 {405 {
382 if (job->options.finished)406 if (job->finished() == false)
383 {407 {
384 return GEARMAN_SUCCESS;408 if (job->reducer)
385 }409 {
386410 if (result_size)
387 if (job->reducer)411 {
388 {412 gearman_argument_t value= gearman_argument_make(NULL, 0, static_cast<const char *>(result), result_size);
389 if (result_size)413 job->reducer->add(value);
390 {414 }
391 gearman_argument_t value= gearman_argument_make(NULL, 0, static_cast<const char *>(result), result_size);415
392 job->reducer->add(value);416 gearman_return_t rc= job->reducer->complete();
393 }417 if (gearman_failed(rc))
394418 {
395 gearman_return_t rc= job->reducer->complete();419 return gearman_error(job->universal(), rc, "The reducer's complete() returned an error");
396 if (gearman_failed(rc))420 }
397 {421
398 return gearman_error(job->worker->impl()->universal, rc, "The reducer's complete() returned an error");422 const gearman_vector_st *reduced_value= job->reducer->result.string();
399 }423 if (reduced_value)
400424 {
401 const gearman_vector_st *reduced_value= job->reducer->result.string();425 result= gearman_string_value(reduced_value);
402 if (reduced_value)426 result_size= gearman_string_length(reduced_value);
403 {427 }
404 result= gearman_string_value(reduced_value);428 else
405 result_size= gearman_string_length(reduced_value);429 {
406 }430 result= NULL;
407 else431 result_size= 0;
408 {432 }
409 result= NULL;433 }
410 result_size= 0;434
411 }435 const void *args[2];
412 } 436 size_t args_size[2];
413437
414 const void *args[2];438 if (not (job->options.work_in_use))
415 size_t args_size[2];439 {
416440 args[0]= job->assigned.arg[0];
417 if (not (job->options.work_in_use))441 args_size[0]= job->assigned.arg_size[0];
418 {442
419 args[0]= job->assigned.arg[0];443 args[1]= result;
420 args_size[0]= job->assigned.arg_size[0];444 args_size[1]= result_size;
421445 gearman_return_t ret= gearman_packet_create_args(job->_worker.universal, job->work,
422 args[1]= result;446 GEARMAN_MAGIC_REQUEST,
423 args_size[1]= result_size;447 GEARMAN_COMMAND_WORK_COMPLETE,
424 gearman_return_t ret= gearman_packet_create_args(job->worker->impl()->universal, job->work,448 args, args_size, 2);
425 GEARMAN_MAGIC_REQUEST,449 if (gearman_failed(ret))
426 GEARMAN_COMMAND_WORK_COMPLETE,450 {
427 args, args_size, 2);451 return ret;
452 }
453 job->options.work_in_use= true;
454 }
455
456 gearman_return_t ret= _job_send(job);
428 if (gearman_failed(ret))457 if (gearman_failed(ret))
429 {458 {
430 return ret;459 return ret;
431 }460 }
432 job->options.work_in_use= true;461
433 }462 job->finished(true);
434463 }
435 gearman_return_t ret= _job_send(job);
436 if (gearman_failed(ret))
437 {
438 return ret;
439 }
440
441 job->options.finished= true;
442464
443 return GEARMAN_SUCCESS;465 return GEARMAN_SUCCESS;
444 }466 }
@@ -452,21 +474,26 @@
452{474{
453 if (job)475 if (job)
454 {476 {
455 if (not (job->options.work_in_use))477 if (job->finished() == false)
456 {478 {
457 gearman_string_t handle_string= { static_cast<const char *>(job->assigned.arg[0]), job->assigned.arg_size[0] };479 if (job->options.work_in_use == false)
458 gearman_string_t exception_string= { static_cast<const char *>(exception), exception_size };
459
460 gearman_return_t ret= libgearman::protocol::work_exception(job->worker->impl()->universal, job->work, handle_string, exception_string);
461 if (gearman_failed(ret))
462 {480 {
463 return ret;481 gearman_string_t handle_string= { static_cast<const char *>(job->assigned.arg[0]), job->assigned.arg_size[0] };
482 gearman_string_t exception_string= { static_cast<const char *>(exception), exception_size };
483
484 gearman_return_t ret= libgearman::protocol::work_exception(job->_worker.universal, job->work, handle_string, exception_string);
485 if (gearman_failed(ret))
486 {
487 return ret;
488 }
489
490 job->options.work_in_use= true;
464 }491 }
465492
466 job->options.work_in_use= true;493 return _job_send(job);
467 }494 }
468495
469 return _job_send(job);496 return GEARMAN_SUCCESS;
470 }497 }
471498
472 return GEARMAN_INVALID_ARGUMENT;499 return GEARMAN_INVALID_ARGUMENT;
@@ -476,12 +503,17 @@
476{503{
477 if (job)504 if (job)
478 {505 {
479 if (job->reducer)506 if (job->finished() == false)
480 {507 {
481 return GEARMAN_INVALID_ARGUMENT;508 if (job->reducer)
509 {
510 return gearman_error(job->universal(), GEARMAN_INVALID_ARGUMENT, "Job has a reducer");
511 }
512
513 return gearman_job_send_fail_fin(job);
482 }514 }
483515
484 return gearman_job_send_fail_fin(job);516 return GEARMAN_SUCCESS;
485 }517 }
486518
487 return GEARMAN_INVALID_ARGUMENT;519 return GEARMAN_INVALID_ARGUMENT;
@@ -494,34 +526,32 @@
494 const void *args[1];526 const void *args[1];
495 size_t args_size[1];527 size_t args_size[1];
496528
497 if (job->options.finished)529 if (job->finished() == false)
498 {530 {
499 return GEARMAN_SUCCESS;531 if (not (job->options.work_in_use))
500 }532 {
501533 args[0]= job->assigned.arg[0];
502 if (not (job->options.work_in_use))534 args_size[0]= job->assigned.arg_size[0] - 1;
503 {535 gearman_return_t ret= gearman_packet_create_args(job->_worker.universal, job->work,
504 args[0]= job->assigned.arg[0];536 GEARMAN_MAGIC_REQUEST,
505 args_size[0]= job->assigned.arg_size[0] - 1;537 GEARMAN_COMMAND_WORK_FAIL,
506 gearman_return_t ret= gearman_packet_create_args(job->worker->impl()->universal, job->work,538 args, args_size, 1);
507 GEARMAN_MAGIC_REQUEST,539 if (gearman_failed(ret))
508 GEARMAN_COMMAND_WORK_FAIL,540 {
509 args, args_size, 1);541 return ret;
542 }
543
544 job->options.work_in_use= true;
545 }
546
547 gearman_return_t ret= _job_send(job);
510 if (gearman_failed(ret))548 if (gearman_failed(ret))
511 {549 {
512 return ret;550 return ret;
513 }551 }
514552
515 job->options.work_in_use= true;553 job->finished(true);
516 }554 }
517
518 gearman_return_t ret= _job_send(job);
519 if (gearman_failed(ret))
520 {
521 return ret;
522 }
523
524 job->options.finished= true;
525555
526 return GEARMAN_SUCCESS;556 return GEARMAN_SUCCESS;
527 }557 }
@@ -664,9 +694,9 @@
664 gearman_packet_free(&(job->work));694 gearman_packet_free(&(job->work));
665 }695 }
666696
667 if (job->worker->impl()->job_list == job)697 if (job->_worker.job_list == job)
668 {698 {
669 job->worker->impl()->job_list= job->next;699 job->_worker.job_list= job->next;
670 }700 }
671701
672 if (job->prev)702 if (job->prev)
@@ -678,7 +708,7 @@
678 {708 {
679 job->next->prev= job->prev;709 job->next->prev= job->prev;
680 }710 }
681 job->worker->impl()->job_count--;711 job->_worker.job_count--;
682712
683 delete job->reducer;713 delete job->reducer;
684 job->reducer= NULL;714 job->reducer= NULL;
@@ -700,7 +730,7 @@
700730
701 while ((ret == GEARMAN_IO_WAIT) or (ret == GEARMAN_TIMEOUT))731 while ((ret == GEARMAN_IO_WAIT) or (ret == GEARMAN_TIMEOUT))
702 {732 {
703 ret= gearman_wait(job->worker->impl()->universal);733 ret= gearman_wait(job->universal());
704 if (ret == GEARMAN_SUCCESS)734 if (ret == GEARMAN_SUCCESS)
705 {735 {
706 ret= job->con->send_packet(job->work, true);736 ret= job->con->send_packet(job->work, true);
@@ -720,9 +750,9 @@
720750
721const char *gearman_job_error(gearman_job_st *job)751const char *gearman_job_error(gearman_job_st *job)
722{752{
723 if (job and job->worker)753 if (job)
724 {754 {
725 return gearman_worker_error(job->worker);755 return job->_worker.error();
726 }756 }
727757
728 return NULL;758 return NULL;
729759
=== modified file 'libgearman/job.hpp'
--- libgearman/job.hpp 2012-08-19 21:00:10 +0000
+++ libgearman/job.hpp 2013-07-03 01:51:27 +0000
@@ -37,15 +37,32 @@
3737
38#pragma once38#pragma once
3939
40#include "libgearman/interface/worker.hpp"
41
40struct gearman_job_st42struct gearman_job_st
41{43{
44 gearman_job_st(Worker& worker_):
45 _worker(worker_)
46 { }
47
42 struct {48 struct {
43 bool allocated;49 bool allocated;
44 bool assigned_in_use;50 bool assigned_in_use;
45 bool work_in_use;51 bool work_in_use;
46 bool finished;52 bool finished;
47 } options;53 } options;
48 gearman_worker_st *worker;54
55 bool finished() const
56 {
57 return options.finished;
58 }
59
60 void finished(const bool finished_)
61 {
62 options.finished= finished_;
63 }
64
65 Worker& _worker;
49 gearman_job_st *next;66 gearman_job_st *next;
50 gearman_job_st *prev;67 gearman_job_st *prev;
51 gearman_connection_st *con;68 gearman_connection_st *con;
@@ -53,4 +70,9 @@
53 gearman_packet_st work;70 gearman_packet_st work;
54 struct gearman_job_reducer_st *reducer;71 struct gearman_job_reducer_st *reducer;
55 gearman_return_t error_code;72 gearman_return_t error_code;
73
74 gearman_universal_st& universal()
75 {
76 return _worker.universal;
77 }
56};78};
5779
=== modified file 'libgearman/worker.cc'
--- libgearman/worker.cc 2013-06-23 08:23:26 +0000
+++ libgearman/worker.cc 2013-07-03 01:51:27 +0000
@@ -971,6 +971,11 @@
971 return GEARMAN_INVALID_ARGUMENT;971 return GEARMAN_INVALID_ARGUMENT;
972}972}
973973
974void gearman_worker_reset_error(Worker& worker)
975{
976 universal_reset_error(worker.universal);
977}
978
974void gearman_worker_reset_error(gearman_worker_st *worker)979void gearman_worker_reset_error(gearman_worker_st *worker)
975{980{
976 if (worker and worker->impl())981 if (worker and worker->impl())
@@ -1354,7 +1359,7 @@
1354{1359{
1355 if (job)1360 if (job)
1356 {1361 {
1357 return gearman_worker_clone(NULL, job->worker);1362 return gearman_worker_clone(NULL, job->_worker.shell());
1358 }1363 }
13591364
1360 return NULL;1365 return NULL;

Subscribers

People subscribed via source and target branches

to all changes: