Merge lp:~brianaker/gearmand/job-cleanup-for-exception into lp:gearmand

Proposed by Brian Aker on 2013-07-06
Status: Merged
Merged at revision: 810
Proposed branch: lp:~brianaker/gearmand/job-cleanup-for-exception
Merge into: lp:gearmand
Diff against target: 391 lines (+120/-51)
5 files modified
libgearman-server/plugins/protocol/gear/protocol.cc (+9/-0)
libgearman-server/thread.cc (+1/-1)
libgearman/interface/task.hpp (+5/-0)
libgearman/run.cc (+22/-14)
tests/libgearman-1.0/worker_test.cc (+83/-36)
To merge this branch: bzr merge lp:~brianaker/gearmand/job-cleanup-for-exception
Reviewer Review Type Date Requested Status
Tangent Trunk 2013-07-06 Pending
Review via email: mp+173298@code.launchpad.net
To post a comment you must log in.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'libgearman-server/plugins/protocol/gear/protocol.cc'
2--- libgearman-server/plugins/protocol/gear/protocol.cc 2013-07-06 00:33:10 +0000
3+++ libgearman-server/plugins/protocol/gear/protocol.cc 2013-07-06 05:33:27 +0000
4@@ -248,6 +248,15 @@
5 int(packet->data_size),
6 packet->data);
7 }
8+ else if (packet->command == GEARMAN_COMMAND_OPTION_REQ)
9+ {
10+ gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
11+ "GEAR length: %" PRIu64 " gearmand_command_t: %s option: %.*s",
12+ uint64_t(packet->data_size),
13+ gearman_strcommand(packet->command),
14+ int(packet->data_size),
15+ packet->data);
16+ }
17 else
18 {
19 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
20
21=== modified file 'libgearman-server/thread.cc'
22--- libgearman-server/thread.cc 2013-05-05 03:23:55 +0000
23+++ libgearman-server/thread.cc 2013-07-06 05:33:27 +0000
24@@ -309,7 +309,7 @@
25
26 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
27 "Received %s %s:%u",
28- gearman_command_info(con->packet->packet.command)->name,
29+ gearmand_strcommand(&con->packet->packet),
30 con->_host == NULL ? "-" : con->_host,
31 con->_port == NULL ? "-" : con->_port);
32
33
34=== modified file 'libgearman/interface/task.hpp'
35--- libgearman/interface/task.hpp 2013-05-07 09:45:37 +0000
36+++ libgearman/interface/task.hpp 2013-07-06 05:33:27 +0000
37@@ -154,6 +154,11 @@
38
39 bool create_result(size_t initial_size);
40
41+ void set_state(const enum gearman_task_state_t state_)
42+ {
43+ state= state_;
44+ }
45+
46 private:
47 gearman_task_st* _shell;
48 gearman_task_st _owned_shell;
49
50=== modified file 'libgearman/run.cc'
51--- libgearman/run.cc 2013-06-15 19:22:33 +0000
52+++ libgearman/run.cc 2013-07-06 05:33:27 +0000
53@@ -103,7 +103,7 @@
54 }
55 else if (ret == GEARMAN_IO_WAIT)
56 {
57- task->state= GEARMAN_TASK_STATE_SUBMIT;
58+ task->set_state(GEARMAN_TASK_STATE_SUBMIT);
59 return ret;
60 }
61 else if (gearman_failed(ret))
62@@ -134,12 +134,12 @@
63
64 if (ret == GEARMAN_COULD_NOT_CONNECT) // If no connection is found, we will let the user try again
65 {
66- task->state= GEARMAN_TASK_STATE_NEW;
67+ task->set_state(GEARMAN_TASK_STATE_NEW);
68 task->client->impl()->new_tasks++;
69 }
70 else
71 {
72- task->state= GEARMAN_TASK_STATE_FAIL;
73+ task->set_state(GEARMAN_TASK_STATE_FAIL);
74 task->client->impl()->running_tasks--;
75 }
76 return ret;
77@@ -166,13 +166,13 @@
78 gearman_return_t ret= task->func.workload_fn(task->shell());
79 if (gearman_failed(ret))
80 {
81- task->state= GEARMAN_TASK_STATE_WORKLOAD;
82+ task->set_state(GEARMAN_TASK_STATE_WORKLOAD);
83 return ret;
84 }
85 }
86
87 task->client->impl()->options.no_new= false;
88- task->state= GEARMAN_TASK_STATE_WORK;
89+ task->set_state(GEARMAN_TASK_STATE_WORK);
90 task->con->set_events(POLLIN);
91 return GEARMAN_SUCCESS;
92
93@@ -190,7 +190,7 @@
94 gearman_return_t ret= task->func.created_fn(task->shell());
95 if (gearman_failed(ret))
96 {
97- task->state= GEARMAN_TASK_STATE_CREATED;
98+ task->set_state(GEARMAN_TASK_STATE_CREATED);
99 return ret;
100 }
101 }
102@@ -201,6 +201,7 @@
103 task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_EPOCH ||
104 task->send.command == GEARMAN_COMMAND_SUBMIT_REDUCE_JOB_BACKGROUND)
105 {
106+ task->result_rc= GEARMAN_SUCCESS;
107 break;
108 }
109 }
110@@ -215,7 +216,7 @@
111 gearman_return_t ret= task->func.data_fn(task->shell());
112 if (gearman_failed(ret))
113 {
114- task->state= GEARMAN_TASK_STATE_DATA;
115+ task->set_state(GEARMAN_TASK_STATE_DATA);
116 return ret;
117 }
118 }
119@@ -228,7 +229,7 @@
120 gearman_return_t ret= task->func.warning_fn(task->shell());
121 if (gearman_failed(ret))
122 {
123- task->state= GEARMAN_TASK_STATE_WARNING;
124+ task->set_state(GEARMAN_TASK_STATE_WARNING);
125 return ret;
126 }
127 }
128@@ -318,7 +319,7 @@
129 gearman_return_t ret= task->func.status_fn(task->shell());
130 if (gearman_failed(ret))
131 {
132- task->state= GEARMAN_TASK_STATE_STATUS;
133+ task->set_state(GEARMAN_TASK_STATE_STATUS);
134 return ret;
135 }
136 }
137@@ -345,7 +346,7 @@
138 gearman_return_t ret= task->func.complete_fn(task->shell());
139 if (gearman_failed(ret))
140 {
141- task->state= GEARMAN_TASK_STATE_COMPLETE;
142+ task->set_state(GEARMAN_TASK_STATE_COMPLETE);
143 return ret;
144 }
145 }
146@@ -370,7 +371,7 @@
147 gearman_return_t ret= task->func.exception_fn(task->shell());
148 if (gearman_failed(ret))
149 {
150- task->state= GEARMAN_TASK_STATE_EXCEPTION;
151+ task->set_state(GEARMAN_TASK_STATE_EXCEPTION);
152 return ret;
153 }
154 }
155@@ -392,7 +393,7 @@
156 gearman_return_t ret= task->func.fail_fn(task->shell());
157 if (gearman_failed(ret))
158 {
159- task->state= GEARMAN_TASK_STATE_FAIL;
160+ task->set_state(GEARMAN_TASK_STATE_FAIL);
161 return ret;
162 }
163 }
164@@ -400,7 +401,7 @@
165 break;
166 }
167
168- task->state= GEARMAN_TASK_STATE_WORK;
169+ task->set_state(GEARMAN_TASK_STATE_WORK);
170 return GEARMAN_SUCCESS;
171
172 case GEARMAN_TASK_STATE_FINISHED:
173@@ -408,7 +409,14 @@
174 }
175
176 task->client->impl()->running_tasks--;
177- task->state= GEARMAN_TASK_STATE_FINISHED;
178+ task->set_state(GEARMAN_TASK_STATE_FINISHED);
179+
180+ // @todo this should never happen... but background tasks can signal it.
181+ if (task->result_rc == GEARMAN_UNKNOWN_STATE and task->client->impl()->universal.error())
182+ {
183+ fprintf(stderr, "%s:%u %s\n", __FILE__, __LINE__, task->client->impl()->universal.error());
184+ }
185+ assert(task->result_rc != GEARMAN_UNKNOWN_STATE);
186
187 if (task->client->impl()->options.free_tasks and task->type == GEARMAN_TASK_KIND_ADD_TASK)
188 {
189
190=== modified file 'tests/libgearman-1.0/worker_test.cc'
191--- tests/libgearman-1.0/worker_test.cc 2013-07-05 22:06:45 +0000
192+++ tests/libgearman-1.0/worker_test.cc 2013-07-06 05:33:27 +0000
193@@ -66,6 +66,19 @@
194 #include "tests/workers/v2/call_exception.h"
195 #include "tests/workers/v2/check_order.h"
196
197+#if 0
198+static gearman_return_t exception_fn(gearman_task_st* task)
199+{
200+ Out << "GEARMAN_WORK_EXCEPTION: Task Handle: " << gearman_task_job_handle(task) << " return:" << gearman_strerror(gearman_task_return(task));
201+ return GEARMAN_SUCCESS;
202+}
203+#endif
204+
205+static void error_logger(const char* message, gearman_verbose_t, void*)
206+{
207+ Error << message;
208+}
209+
210 static test_return_t init_test(void *)
211 {
212 gearman_worker_st worker;
213@@ -420,7 +433,6 @@
214 static test_return_t job_order_TEST(void *)
215 {
216 libgearman::Client client(libtest::default_port());;
217- ASSERT_EQ(true, gearman_client_set_server_option(&client, test_literal_param("exceptions")));
218 gearman_client_add_options(&client, GEARMAN_CLIENT_EXCEPTION);
219 gearman_client_add_options(&client, GEARMAN_CLIENT_GENERATE_UNIQUE);
220
221@@ -534,13 +546,16 @@
222 for (std::vector<gearman_task_st*>::iterator iter= tasks.begin();
223 iter != tasks.end(); ++iter)
224 {
225- if (gearman_task_return(*iter) != GEARMAN_UNKNOWN_STATE)
226+
227+ if (gearman_failed(gearman_task_return(*iter)))
228 {
229- Error << ":" << gearman_task_error(*iter) << ":";
230- Error << ":" << gearman_strerror(gearman_task_return(*iter)) << ":";
231+ if (gearman_task_return(*iter) != GEARMAN_UNKNOWN_STATE)
232+ {
233+ Error << "gearman_task_error(" << gearman_task_error(*iter) << ") gearman_task_return(" << gearman_strerror(gearman_task_return(*iter)) << ")";
234+ ASSERT_EQ(GEARMAN_UNKNOWN_STATE, gearman_task_return(*iter));
235+ ASSERT_NULL(gearman_task_error(*iter));
236+ }
237 }
238- ASSERT_EQ(GEARMAN_UNKNOWN_STATE, gearman_task_return(*iter));
239- ASSERT_NULL(gearman_task_error(*iter));
240 }
241
242 for (uint32_t x= 0; x < 10; ++x)
243@@ -759,12 +774,6 @@
244 return TEST_SUCCESS;
245 }
246
247-static gearman_return_t exception_fn(gearman_task_st* task)
248-{
249- Out << "Task Handle: " << gearman_task_job_handle(task);
250- return GEARMAN_SUCCESS;
251-}
252-
253 static test_return_t gearman_job_send_exception_mass_TEST(void *)
254 {
255 gearman_function_t call_exception_WORKER_FN= gearman_function_create(call_exception_WORKER);
256@@ -779,8 +788,14 @@
257 std::vector<gearman_task_st*> tasks;
258 libgearman::Client client(libtest::default_port());
259
260+#if 0
261 gearman_exception_fn *func= exception_fn;
262 gearman_client_set_exception_fn(&client, func);
263+#endif
264+#if 0
265+ ASSERT_EQ(true, gearman_client_set_server_option(&client, test_literal_param("exceptions")));
266+#endif
267+ gearman_client_add_options(&client, GEARMAN_CLIENT_EXCEPTION);
268
269 for (size_t x= 0; x < 100; ++x)
270 {
271@@ -800,33 +815,59 @@
272 tasks.push_back(task);
273 }
274
275- {
276- gearman_return_t ret;
277- do {
278- ret= gearman_client_run_tasks(&client);
279- } while (gearman_continue(ret));
280- }
281-
282- for (std::vector<gearman_task_st*>::iterator iter= tasks.begin();
283- iter != tasks.end(); ++iter)
284- {
285- ASSERT_EQ(GEARMAN_WORK_FAIL, gearman_task_return(*iter));
286+ bool more= true;
287+ while (more)
288+ {
289+ for (std::vector<gearman_task_st*>::iterator iter= tasks.begin();
290+ iter != tasks.end(); ++iter)
291+ {
292+ if (gearman_task_return(*iter) == GEARMAN_UNKNOWN_STATE)
293+ {
294+ {
295+ gearman_return_t ret;
296+ do {
297+ ret= gearman_client_run_tasks(&client);
298+ } while (gearman_continue(ret));
299+
300+ if (gearman_failed(ret))
301+ {
302+ Error << gearman_strerror(ret);
303+ }
304+ ASSERT_EQ(GEARMAN_SUCCESS, ret);
305+ }
306+
307+ continue;
308+ }
309+ else
310+ {
311+ if (gearman_client_has_option(&client, GEARMAN_CLIENT_EXCEPTION))
312+ {
313+ ASSERT_EQ(GEARMAN_WORK_EXCEPTION, gearman_task_return(*iter));
314+ }
315+ else
316+ {
317+ ASSERT_EQ(GEARMAN_WORK_FAIL, gearman_task_return(*iter));
318+ }
319+ }
320+
321+ more= false;
322+ }
323 }
324
325 return TEST_SUCCESS;
326 }
327
328-static void error_logger(const char* message, gearman_verbose_t, void*)
329-{
330- Error << message;
331-}
332-
333 static test_return_t gearman_job_send_exception_TEST(void *)
334 {
335 libgearman::Client client(libtest::default_port());
336 gearman_client_set_log_fn(&client, error_logger, NULL, GEARMAN_VERBOSE_ERROR);
337
338- ASSERT_EQ(true, gearman_client_set_server_option(&client, test_literal_param("exceptions")));
339+#if 0
340+ gearman_exception_fn *func= exception_fn;
341+ gearman_client_set_exception_fn(&client, func);
342+#endif
343+
344+ gearman_client_add_options(&client, GEARMAN_CLIENT_EXCEPTION);
345
346 gearman_function_t exception_WORKER_FN= gearman_function_create(exception_WORKER);
347 std::auto_ptr<worker_handle_st> handle(test_worker_start(libtest::default_port(),
348@@ -848,9 +889,16 @@
349 ret= gearman_client_run_tasks(&client);
350 } while (gearman_continue(ret));
351
352+ if (gearman_client_has_option(&client, GEARMAN_CLIENT_EXCEPTION))
353+ {
354+ ASSERT_EQ(GEARMAN_WORK_EXCEPTION, gearman_task_return(task));
355+ }
356+ else
357+ {
358+ ASSERT_EQ(GEARMAN_WORK_FAIL, gearman_task_return(task));
359+ }
360+ ASSERT_TRUE(gearman_task_has_exception(task));
361 gearman_string_t exception= gearman_task_exception(task);
362- ASSERT_EQ(gearman_task_return(task), GEARMAN_WORK_EXCEPTION);
363- ASSERT_TRUE(gearman_task_has_exception(task));
364 ASSERT_STREQ("dog", gearman_c_str(exception));
365
366 return TEST_SUCCESS;
367@@ -1252,20 +1300,19 @@
368 {
369 libgearman::Client client(libtest::default_port());
370
371- ASSERT_EQ(gearman_client_do_background(&client, function_name, unique_name,
372- test_string_make_from_array(unique_name), NULL),
373- GEARMAN_SUCCESS);
374+ ASSERT_EQ(GEARMAN_SUCCESS,
375+ gearman_client_do_background(&client, function_name, unique_name, test_string_make_from_array(unique_name), NULL));
376 }
377
378 gearman_worker_remove_options(&worker, GEARMAN_WORKER_GRAB_UNIQ);
379- test_false(worker->impl()->options.grab_uniq);
380+ ASSERT_FALSE(worker->impl()->options.grab_uniq);
381
382 gearman_worker_set_timeout(&worker, 800);
383
384 gearman_return_t rc;
385 gearman_job_st *job= gearman_worker_grab_job(&worker, NULL, &rc);
386 ASSERT_EQ(rc, GEARMAN_SUCCESS);
387- test_truth(job);
388+ ASSERT_TRUE(job);
389
390 size_t size= 0;
391 void *result= no_unique_worker(job, NULL, &size, &rc);

Subscribers

People subscribed via source and target branches

to all changes: