Merge lp:~brianaker/gearmand/finish into lp:gearmand

Proposed by Brian Aker on 2013-07-03
Status: Merged
Merged at revision: 799
Proposed branch: lp:~brianaker/gearmand/finish
Merge into: lp:gearmand
Diff against target: 708 lines (+277/-214)
6 files modified
libgearman/error.hpp (+1/-0)
libgearman/function/function_v1.hpp (+1/-1)
libgearman/interface/worker.hpp (+5/-0)
libgearman/job.cc (+241/-211)
libgearman/job.hpp (+23/-1)
libgearman/worker.cc (+6/-1)
To merge this branch: bzr merge lp:~brianaker/gearmand/finish
Reviewer Review Type Date Requested Status
Tangent Trunk 2013-07-03 Pending
Review via email: mp+172716@code.launchpad.net
To post a comment you must log in.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'libgearman/error.hpp'
2--- libgearman/error.hpp 2013-02-04 06:00:39 +0000
3+++ libgearman/error.hpp 2013-07-03 01:51:27 +0000
4@@ -63,3 +63,4 @@
5 const char *position);
6
7 void gearman_worker_reset_error(gearman_worker_st *worker);
8+void gearman_worker_reset_error(Worker&);
9
10=== modified file 'libgearman/function/function_v1.hpp'
11--- libgearman/function/function_v1.hpp 2012-08-20 17:07:47 +0000
12+++ libgearman/function/function_v1.hpp 2013-07-03 01:51:27 +0000
13@@ -63,7 +63,7 @@
14 }
15
16 job->error_code= GEARMAN_SUCCESS;
17- job->worker->impl()->work_result= _worker_fn(job, context_arg, &(job->worker->impl()->work_result_size), &job->error_code);
18+ job->_worker.work_result= _worker_fn(job, context_arg, &(job->_worker.work_result_size), &job->error_code);
19
20 if (job->error_code == GEARMAN_LOST_CONNECTION)
21 {
22
23=== modified file 'libgearman/interface/worker.hpp'
24--- libgearman/interface/worker.hpp 2013-06-05 21:59:31 +0000
25+++ libgearman/interface/worker.hpp 2013-07-03 01:51:27 +0000
26@@ -127,6 +127,11 @@
27 universal.options._ssl= ssl_;
28 }
29
30+ const char* error() const
31+ {
32+ return universal.error();
33+ }
34+
35 private:
36 gearman_worker_st* _shell;
37 gearman_worker_st _owned_shell;
38
39=== modified file 'libgearman/job.cc'
40--- libgearman/job.cc 2013-06-15 07:55:07 +0000
41+++ libgearman/job.cc 2013-07-03 01:51:27 +0000
42@@ -170,42 +170,46 @@
43
44 gearman_job_st *gearman_job_create(gearman_worker_st *worker, gearman_job_st *job)
45 {
46- if (job)
47- {
48- job->options.allocated= false;
49- }
50- else
51- {
52- job= new (std::nothrow) gearman_job_st;
53- if (job == NULL)
54- {
55- gearman_perror(worker->impl()->universal, "new");
56- return NULL;
57- }
58-
59- job->options.allocated= true;
60- }
61-
62- job->options.assigned_in_use= false;
63- job->options.work_in_use= false;
64- job->options.finished= false;
65-
66- job->worker= worker;
67- job->reducer= NULL;
68- job->error_code= GEARMAN_UNKNOWN_STATE;
69-
70- if (worker->impl()->job_list)
71- {
72- worker->impl()->job_list->prev= job;
73- }
74- job->next= worker->impl()->job_list;
75- job->prev= NULL;
76- worker->impl()->job_list= job;
77- worker->impl()->job_count++;
78-
79- job->con= NULL;
80-
81- return job;
82+ if (worker)
83+ {
84+ if (job)
85+ {
86+ job->options.allocated= false;
87+ }
88+ else
89+ {
90+ job= new (std::nothrow) gearman_job_st(*(worker->impl()));
91+ if (job == NULL)
92+ {
93+ gearman_perror(job->_worker.universal, "new");
94+ return NULL;
95+ }
96+
97+ job->options.allocated= true;
98+ }
99+
100+ job->options.assigned_in_use= false;
101+ job->options.work_in_use= false;
102+ job->options.finished= false;
103+
104+ job->reducer= NULL;
105+ job->error_code= GEARMAN_UNKNOWN_STATE;
106+
107+ if (job->_worker.job_list)
108+ {
109+ job->_worker.job_list->prev= job;
110+ }
111+ job->next= job->_worker.job_list;
112+ job->prev= NULL;
113+ job->_worker.job_list= job;
114+ job->_worker.job_count++;
115+
116+ job->con= NULL;
117+
118+ return job;
119+ }
120+
121+ return NULL;
122 }
123
124 bool gearman_job_build_reducer(gearman_job_st *job, gearman_aggregator_fn *aggregator_fn)
125@@ -217,7 +221,7 @@
126
127 gearman_string_t reducer_func= gearman_job_reducer_string(job);
128
129- job->reducer= new (std::nothrow) gearman_job_reducer_st(job->worker->impl()->universal, reducer_func, aggregator_fn);
130+ job->reducer= new (std::nothrow) gearman_job_reducer_st(job->universal(), reducer_func, aggregator_fn);
131 if (not job->reducer)
132 {
133 gearman_job_free(job);
134@@ -235,9 +239,9 @@
135
136 static inline void gearman_job_reset_error(gearman_job_st *job)
137 {
138- if (job->worker)
139+ if (job)
140 {
141- gearman_worker_reset_error(job->worker);
142+ gearman_worker_reset_error(job->_worker);
143 }
144 }
145
146@@ -245,36 +249,41 @@
147 {
148 if (job)
149 {
150- const void *args[2];
151- size_t args_size[2];
152-
153- if (job->reducer)
154- {
155- gearman_argument_t value= gearman_argument_make(NULL, 0, static_cast<const char *>(data), data_size);
156- job->reducer->add(value);
157-
158- return GEARMAN_SUCCESS;
159- }
160-
161- if ((job->options.work_in_use) == false)
162- {
163- args[0]= job->assigned.arg[0];
164- args_size[0]= job->assigned.arg_size[0];
165- args[1]= data;
166- args_size[1]= data_size;
167- gearman_return_t ret= gearman_packet_create_args(job->worker->impl()->universal, job->work,
168- GEARMAN_MAGIC_REQUEST,
169- GEARMAN_COMMAND_WORK_DATA,
170- args, args_size, 2);
171- if (gearman_failed(ret))
172- {
173- return ret;
174- }
175-
176- job->options.work_in_use= true;
177- }
178-
179- return _job_send(job);
180+ if (job->finished() == false)
181+ {
182+ const void *args[2];
183+ size_t args_size[2];
184+
185+ if (job->reducer)
186+ {
187+ gearman_argument_t value= gearman_argument_make(NULL, 0, static_cast<const char *>(data), data_size);
188+ job->reducer->add(value);
189+
190+ return GEARMAN_SUCCESS;
191+ }
192+
193+ if ((job->options.work_in_use) == false)
194+ {
195+ args[0]= job->assigned.arg[0];
196+ args_size[0]= job->assigned.arg_size[0];
197+ args[1]= data;
198+ args_size[1]= data_size;
199+ gearman_return_t ret= gearman_packet_create_args(job->universal(), job->work,
200+ GEARMAN_MAGIC_REQUEST,
201+ GEARMAN_COMMAND_WORK_DATA,
202+ args, args_size, 2);
203+ if (gearman_failed(ret))
204+ {
205+ return ret;
206+ }
207+
208+ job->options.work_in_use= true;
209+ }
210+
211+ return _job_send(job);
212+ }
213+
214+ return GEARMAN_SUCCESS;
215 }
216
217 return GEARMAN_INVALID_ARGUMENT;
218@@ -286,30 +295,35 @@
219 {
220 if (job)
221 {
222- const void *args[2];
223- size_t args_size[2];
224-
225- if ((job->options.work_in_use) == false)
226+ if (job->finished() == false)
227 {
228- args[0]= job->assigned.arg[0];
229- args_size[0]= job->assigned.arg_size[0];
230- args[1]= warning;
231- args_size[1]= warning_size;
232+ const void *args[2];
233+ size_t args_size[2];
234
235- gearman_return_t ret;
236- ret= gearman_packet_create_args(job->worker->impl()->universal, job->work,
237- GEARMAN_MAGIC_REQUEST,
238- GEARMAN_COMMAND_WORK_WARNING,
239- args, args_size, 2);
240- if (gearman_failed(ret))
241+ if ((job->options.work_in_use) == false)
242 {
243- return ret;
244+ args[0]= job->assigned.arg[0];
245+ args_size[0]= job->assigned.arg_size[0];
246+ args[1]= warning;
247+ args_size[1]= warning_size;
248+
249+ gearman_return_t ret;
250+ ret= gearman_packet_create_args(job->universal(), job->work,
251+ GEARMAN_MAGIC_REQUEST,
252+ GEARMAN_COMMAND_WORK_WARNING,
253+ args, args_size, 2);
254+ if (gearman_failed(ret))
255+ {
256+ return ret;
257+ }
258+
259+ job->options.work_in_use= true;
260 }
261
262- job->options.work_in_use= true;
263+ return _job_send(job);
264 }
265
266- return _job_send(job);
267+ return GEARMAN_SUCCESS;
268 }
269
270 return GEARMAN_INVALID_ARGUMENT;
271@@ -321,37 +335,42 @@
272 {
273 if (job)
274 {
275- char numerator_string[12];
276- char denominator_string[12];
277- const void *args[3];
278- size_t args_size[3];
279-
280- if (not (job->options.work_in_use))
281+ if (job->finished() == false)
282 {
283- snprintf(numerator_string, 12, "%u", numerator);
284- snprintf(denominator_string, 12, "%u", denominator);
285-
286- args[0]= job->assigned.arg[0];
287- args_size[0]= job->assigned.arg_size[0];
288- args[1]= numerator_string;
289- args_size[1]= strlen(numerator_string) + 1;
290- args[2]= denominator_string;
291- args_size[2]= strlen(denominator_string);
292-
293- gearman_return_t ret;
294- ret= gearman_packet_create_args(job->worker->impl()->universal, job->work,
295- GEARMAN_MAGIC_REQUEST,
296- GEARMAN_COMMAND_WORK_STATUS,
297- args, args_size, 3);
298- if (gearman_failed(ret))
299+ char numerator_string[12];
300+ char denominator_string[12];
301+ const void *args[3];
302+ size_t args_size[3];
303+
304+ if (not (job->options.work_in_use))
305 {
306- return ret;
307+ snprintf(numerator_string, 12, "%u", numerator);
308+ snprintf(denominator_string, 12, "%u", denominator);
309+
310+ args[0]= job->assigned.arg[0];
311+ args_size[0]= job->assigned.arg_size[0];
312+ args[1]= numerator_string;
313+ args_size[1]= strlen(numerator_string) + 1;
314+ args[2]= denominator_string;
315+ args_size[2]= strlen(denominator_string);
316+
317+ gearman_return_t ret;
318+ ret= gearman_packet_create_args(job->universal(), job->work,
319+ GEARMAN_MAGIC_REQUEST,
320+ GEARMAN_COMMAND_WORK_STATUS,
321+ args, args_size, 3);
322+ if (gearman_failed(ret))
323+ {
324+ return ret;
325+ }
326+
327+ job->options.work_in_use= true;
328 }
329
330- job->options.work_in_use= true;
331+ return _job_send(job);
332 }
333
334- return _job_send(job);
335+ return GEARMAN_SUCCESS;
336 }
337
338 return GEARMAN_INVALID_ARGUMENT;
339@@ -363,12 +382,17 @@
340 {
341 if (job)
342 {
343- if (job->reducer)
344+ if (job->finished() == false)
345 {
346- return GEARMAN_INVALID_ARGUMENT;
347+ if (job->reducer)
348+ {
349+ return GEARMAN_INVALID_ARGUMENT;
350+ }
351+
352+ return gearman_job_send_complete_fin(job, result, result_size);
353 }
354
355- return gearman_job_send_complete_fin(job, result, result_size);
356+ return GEARMAN_SUCCESS;
357 }
358
359 return GEARMAN_INVALID_ARGUMENT;
360@@ -379,66 +403,64 @@
361 {
362 if (job)
363 {
364- if (job->options.finished)
365- {
366- return GEARMAN_SUCCESS;
367- }
368-
369- if (job->reducer)
370- {
371- if (result_size)
372- {
373- gearman_argument_t value= gearman_argument_make(NULL, 0, static_cast<const char *>(result), result_size);
374- job->reducer->add(value);
375- }
376-
377- gearman_return_t rc= job->reducer->complete();
378- if (gearman_failed(rc))
379- {
380- return gearman_error(job->worker->impl()->universal, rc, "The reducer's complete() returned an error");
381- }
382-
383- const gearman_vector_st *reduced_value= job->reducer->result.string();
384- if (reduced_value)
385- {
386- result= gearman_string_value(reduced_value);
387- result_size= gearman_string_length(reduced_value);
388- }
389- else
390- {
391- result= NULL;
392- result_size= 0;
393- }
394- }
395-
396- const void *args[2];
397- size_t args_size[2];
398-
399- if (not (job->options.work_in_use))
400- {
401- args[0]= job->assigned.arg[0];
402- args_size[0]= job->assigned.arg_size[0];
403-
404- args[1]= result;
405- args_size[1]= result_size;
406- gearman_return_t ret= gearman_packet_create_args(job->worker->impl()->universal, job->work,
407- GEARMAN_MAGIC_REQUEST,
408- GEARMAN_COMMAND_WORK_COMPLETE,
409- args, args_size, 2);
410+ if (job->finished() == false)
411+ {
412+ if (job->reducer)
413+ {
414+ if (result_size)
415+ {
416+ gearman_argument_t value= gearman_argument_make(NULL, 0, static_cast<const char *>(result), result_size);
417+ job->reducer->add(value);
418+ }
419+
420+ gearman_return_t rc= job->reducer->complete();
421+ if (gearman_failed(rc))
422+ {
423+ return gearman_error(job->universal(), rc, "The reducer's complete() returned an error");
424+ }
425+
426+ const gearman_vector_st *reduced_value= job->reducer->result.string();
427+ if (reduced_value)
428+ {
429+ result= gearman_string_value(reduced_value);
430+ result_size= gearman_string_length(reduced_value);
431+ }
432+ else
433+ {
434+ result= NULL;
435+ result_size= 0;
436+ }
437+ }
438+
439+ const void *args[2];
440+ size_t args_size[2];
441+
442+ if (not (job->options.work_in_use))
443+ {
444+ args[0]= job->assigned.arg[0];
445+ args_size[0]= job->assigned.arg_size[0];
446+
447+ args[1]= result;
448+ args_size[1]= result_size;
449+ gearman_return_t ret= gearman_packet_create_args(job->_worker.universal, job->work,
450+ GEARMAN_MAGIC_REQUEST,
451+ GEARMAN_COMMAND_WORK_COMPLETE,
452+ args, args_size, 2);
453+ if (gearman_failed(ret))
454+ {
455+ return ret;
456+ }
457+ job->options.work_in_use= true;
458+ }
459+
460+ gearman_return_t ret= _job_send(job);
461 if (gearman_failed(ret))
462 {
463 return ret;
464 }
465- job->options.work_in_use= true;
466- }
467-
468- gearman_return_t ret= _job_send(job);
469- if (gearman_failed(ret))
470- {
471- return ret;
472- }
473-
474- job->options.finished= true;
475+
476+ job->finished(true);
477+ }
478
479 return GEARMAN_SUCCESS;
480 }
481@@ -452,21 +474,26 @@
482 {
483 if (job)
484 {
485- if (not (job->options.work_in_use))
486+ if (job->finished() == false)
487 {
488- gearman_string_t handle_string= { static_cast<const char *>(job->assigned.arg[0]), job->assigned.arg_size[0] };
489- gearman_string_t exception_string= { static_cast<const char *>(exception), exception_size };
490-
491- gearman_return_t ret= libgearman::protocol::work_exception(job->worker->impl()->universal, job->work, handle_string, exception_string);
492- if (gearman_failed(ret))
493+ if (job->options.work_in_use == false)
494 {
495- return ret;
496+ gearman_string_t handle_string= { static_cast<const char *>(job->assigned.arg[0]), job->assigned.arg_size[0] };
497+ gearman_string_t exception_string= { static_cast<const char *>(exception), exception_size };
498+
499+ gearman_return_t ret= libgearman::protocol::work_exception(job->_worker.universal, job->work, handle_string, exception_string);
500+ if (gearman_failed(ret))
501+ {
502+ return ret;
503+ }
504+
505+ job->options.work_in_use= true;
506 }
507
508- job->options.work_in_use= true;
509+ return _job_send(job);
510 }
511
512- return _job_send(job);
513+ return GEARMAN_SUCCESS;
514 }
515
516 return GEARMAN_INVALID_ARGUMENT;
517@@ -476,12 +503,17 @@
518 {
519 if (job)
520 {
521- if (job->reducer)
522+ if (job->finished() == false)
523 {
524- return GEARMAN_INVALID_ARGUMENT;
525+ if (job->reducer)
526+ {
527+ return gearman_error(job->universal(), GEARMAN_INVALID_ARGUMENT, "Job has a reducer");
528+ }
529+
530+ return gearman_job_send_fail_fin(job);
531 }
532
533- return gearman_job_send_fail_fin(job);
534+ return GEARMAN_SUCCESS;
535 }
536
537 return GEARMAN_INVALID_ARGUMENT;
538@@ -494,34 +526,32 @@
539 const void *args[1];
540 size_t args_size[1];
541
542- if (job->options.finished)
543- {
544- return GEARMAN_SUCCESS;
545- }
546-
547- if (not (job->options.work_in_use))
548- {
549- args[0]= job->assigned.arg[0];
550- args_size[0]= job->assigned.arg_size[0] - 1;
551- gearman_return_t ret= gearman_packet_create_args(job->worker->impl()->universal, job->work,
552- GEARMAN_MAGIC_REQUEST,
553- GEARMAN_COMMAND_WORK_FAIL,
554- args, args_size, 1);
555+ if (job->finished() == false)
556+ {
557+ if (not (job->options.work_in_use))
558+ {
559+ args[0]= job->assigned.arg[0];
560+ args_size[0]= job->assigned.arg_size[0] - 1;
561+ gearman_return_t ret= gearman_packet_create_args(job->_worker.universal, job->work,
562+ GEARMAN_MAGIC_REQUEST,
563+ GEARMAN_COMMAND_WORK_FAIL,
564+ args, args_size, 1);
565+ if (gearman_failed(ret))
566+ {
567+ return ret;
568+ }
569+
570+ job->options.work_in_use= true;
571+ }
572+
573+ gearman_return_t ret= _job_send(job);
574 if (gearman_failed(ret))
575 {
576 return ret;
577 }
578
579- job->options.work_in_use= true;
580- }
581-
582- gearman_return_t ret= _job_send(job);
583- if (gearman_failed(ret))
584- {
585- return ret;
586- }
587-
588- job->options.finished= true;
589+ job->finished(true);
590+ }
591
592 return GEARMAN_SUCCESS;
593 }
594@@ -664,9 +694,9 @@
595 gearman_packet_free(&(job->work));
596 }
597
598- if (job->worker->impl()->job_list == job)
599+ if (job->_worker.job_list == job)
600 {
601- job->worker->impl()->job_list= job->next;
602+ job->_worker.job_list= job->next;
603 }
604
605 if (job->prev)
606@@ -678,7 +708,7 @@
607 {
608 job->next->prev= job->prev;
609 }
610- job->worker->impl()->job_count--;
611+ job->_worker.job_count--;
612
613 delete job->reducer;
614 job->reducer= NULL;
615@@ -700,7 +730,7 @@
616
617 while ((ret == GEARMAN_IO_WAIT) or (ret == GEARMAN_TIMEOUT))
618 {
619- ret= gearman_wait(job->worker->impl()->universal);
620+ ret= gearman_wait(job->universal());
621 if (ret == GEARMAN_SUCCESS)
622 {
623 ret= job->con->send_packet(job->work, true);
624@@ -720,9 +750,9 @@
625
626 const char *gearman_job_error(gearman_job_st *job)
627 {
628- if (job and job->worker)
629+ if (job)
630 {
631- return gearman_worker_error(job->worker);
632+ return job->_worker.error();
633 }
634
635 return NULL;
636
637=== modified file 'libgearman/job.hpp'
638--- libgearman/job.hpp 2012-08-19 21:00:10 +0000
639+++ libgearman/job.hpp 2013-07-03 01:51:27 +0000
640@@ -37,15 +37,32 @@
641
642 #pragma once
643
644+#include "libgearman/interface/worker.hpp"
645+
646 struct gearman_job_st
647 {
648+ gearman_job_st(Worker& worker_):
649+ _worker(worker_)
650+ { }
651+
652 struct {
653 bool allocated;
654 bool assigned_in_use;
655 bool work_in_use;
656 bool finished;
657 } options;
658- gearman_worker_st *worker;
659+
660+ bool finished() const
661+ {
662+ return options.finished;
663+ }
664+
665+ void finished(const bool finished_)
666+ {
667+ options.finished= finished_;
668+ }
669+
670+ Worker& _worker;
671 gearman_job_st *next;
672 gearman_job_st *prev;
673 gearman_connection_st *con;
674@@ -53,4 +70,9 @@
675 gearman_packet_st work;
676 struct gearman_job_reducer_st *reducer;
677 gearman_return_t error_code;
678+
679+ gearman_universal_st& universal()
680+ {
681+ return _worker.universal;
682+ }
683 };
684
685=== modified file 'libgearman/worker.cc'
686--- libgearman/worker.cc 2013-06-23 08:23:26 +0000
687+++ libgearman/worker.cc 2013-07-03 01:51:27 +0000
688@@ -971,6 +971,11 @@
689 return GEARMAN_INVALID_ARGUMENT;
690 }
691
692+void gearman_worker_reset_error(Worker& worker)
693+{
694+ universal_reset_error(worker.universal);
695+}
696+
697 void gearman_worker_reset_error(gearman_worker_st *worker)
698 {
699 if (worker and worker->impl())
700@@ -1354,7 +1359,7 @@
701 {
702 if (job)
703 {
704- return gearman_worker_clone(NULL, job->worker);
705+ return gearman_worker_clone(NULL, job->_worker.shell());
706 }
707
708 return NULL;

Subscribers

People subscribed via source and target branches

to all changes: