Merge lp:~brianaker/gearmand/misc-fixes2 into lp:gearmand

Proposed by Brian Aker
Status: Merged
Merged at revision: 810
Proposed branch: lp:~brianaker/gearmand/misc-fixes2
Merge into: lp:gearmand
Diff against target: 420 lines (+120/-55)
6 files modified
libgearman-server/plugins/protocol/gear/protocol.cc (+9/-0)
libgearman-server/thread.cc (+1/-1)
libgearman/connection.cc (+4/-4)
libgearman/interface/task.hpp (+5/-0)
libgearman/run.cc (+18/-14)
tests/libgearman-1.0/worker_test.cc (+83/-36)
To merge this branch: bzr merge lp:~brianaker/gearmand/misc-fixes2
Reviewer Review Type Date Requested Status
Tangent Trunk Pending
Review via email: mp+173319@code.launchpad.net
To post a comment you must log in.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'libgearman-server/plugins/protocol/gear/protocol.cc'
2--- libgearman-server/plugins/protocol/gear/protocol.cc 2013-07-06 09:37:11 +0000
3+++ libgearman-server/plugins/protocol/gear/protocol.cc 2013-07-06 17:04:24 +0000
4@@ -248,6 +248,15 @@
5 int(packet->data_size),
6 packet->data);
7 }
8+ else if (packet->command == GEARMAN_COMMAND_OPTION_REQ)
9+ {
10+ gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
11+ "GEAR length: %" PRIu64 " gearmand_command_t: %s option: %.*s",
12+ uint64_t(packet->data_size),
13+ gearman_strcommand(packet->command),
14+ int(packet->data_size),
15+ packet->data);
16+ }
17 else
18 {
19 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
20
21=== modified file 'libgearman-server/thread.cc'
22--- libgearman-server/thread.cc 2013-05-05 03:23:55 +0000
23+++ libgearman-server/thread.cc 2013-07-06 17:04:24 +0000
24@@ -309,7 +309,7 @@
25
26 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
27 "Received %s %s:%u",
28- gearman_command_info(con->packet->packet.command)->name,
29+ gearmand_strcommand(&con->packet->packet),
30 con->_host == NULL ? "-" : con->_host,
31 con->_port == NULL ? "-" : con->_port);
32
33
34=== modified file 'libgearman/connection.cc'
35--- libgearman/connection.cc 2013-07-06 00:33:10 +0000
36+++ libgearman/connection.cc 2013-07-06 17:04:24 +0000
37@@ -75,8 +75,8 @@
38 # define FD_CLOEXEC 0
39 #endif
40
41-#ifndef MSG_DONTWAIT
42-# define MSG_DONTWAIT 0
43+#ifndef MSG_NOSIGNAL
44+# define MSG_NOSIGNAL 0
45 #endif
46
47
48@@ -1105,7 +1105,7 @@
49 #if defined(HAVE_CYASSL) && HAVE_CYASSL
50 if (_ssl)
51 {
52- read_size= CyaSSL_recv(_ssl, data, data_size, MSG_DONTWAIT);
53+ read_size= CyaSSL_recv(_ssl, data, data_size, MSG_NOSIGNAL);
54 if (read_size <= 0)
55 {
56 int sendErr= CyaSSL_get_error(_ssl, read_size);
57@@ -1226,7 +1226,7 @@
58 int rval;
59 do
60 {
61- rval= fcntl (fd, F_SETFD, flags | FD_CLOEXEC);
62+ rval= fcntl(fd, F_SETFD, flags | FD_CLOEXEC);
63 } while (rval == -1 && (errno == EINTR or errno == EAGAIN));
64
65 if (rval == -1)
66
67=== modified file 'libgearman/interface/task.hpp'
68--- libgearman/interface/task.hpp 2013-05-07 09:45:37 +0000
69+++ libgearman/interface/task.hpp 2013-07-06 17:04:24 +0000
70@@ -154,6 +154,11 @@
71
72 bool create_result(size_t initial_size);
73
74+ void set_state(const enum gearman_task_state_t state_)
75+ {
76+ state= state_;
77+ }
78+
79 private:
80 gearman_task_st* _shell;
81 gearman_task_st _owned_shell;
82
83=== modified file 'libgearman/run.cc'
84--- libgearman/run.cc 2013-06-27 02:11:30 +0000
85+++ libgearman/run.cc 2013-07-06 17:04:24 +0000
86@@ -103,7 +103,7 @@
87 }
88 else if (ret == GEARMAN_IO_WAIT)
89 {
90- task->state= GEARMAN_TASK_STATE_SUBMIT;
91+ task->set_state(GEARMAN_TASK_STATE_SUBMIT);
92 return ret;
93 }
94 else if (gearman_failed(ret))
95@@ -134,12 +134,12 @@
96
97 if (ret == GEARMAN_COULD_NOT_CONNECT) // If no connection is found, we will let the user try again
98 {
99- task->state= GEARMAN_TASK_STATE_NEW;
100+ task->set_state(GEARMAN_TASK_STATE_NEW);
101 task->client->impl()->new_tasks++;
102 }
103 else
104 {
105- task->state= GEARMAN_TASK_STATE_FAIL;
106+ task->set_state(GEARMAN_TASK_STATE_FAIL);
107 task->client->impl()->running_tasks--;
108 }
109 return ret;
110@@ -166,13 +166,13 @@
111 gearman_return_t ret= task->func.workload_fn(task->shell());
112 if (gearman_failed(ret))
113 {
114- task->state= GEARMAN_TASK_STATE_WORKLOAD;
115+ task->set_state(GEARMAN_TASK_STATE_WORKLOAD);
116 return ret;
117 }
118 }
119
120 task->client->impl()->options.no_new= false;
121- task->state= GEARMAN_TASK_STATE_WORK;
122+ task->set_state(GEARMAN_TASK_STATE_WORK);
123 task->con->set_events(POLLIN);
124 return GEARMAN_SUCCESS;
125
126@@ -190,7 +190,7 @@
127 gearman_return_t ret= task->func.created_fn(task->shell());
128 if (gearman_failed(ret))
129 {
130- task->state= GEARMAN_TASK_STATE_CREATED;
131+ task->set_state(GEARMAN_TASK_STATE_CREATED);
132 return ret;
133 }
134 }
135@@ -201,6 +201,7 @@
136 task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_EPOCH ||
137 task->send.command == GEARMAN_COMMAND_SUBMIT_REDUCE_JOB_BACKGROUND)
138 {
139+ task->result_rc= GEARMAN_SUCCESS;
140 break;
141 }
142 }
143@@ -215,7 +216,7 @@
144 gearman_return_t ret= task->func.data_fn(task->shell());
145 if (gearman_failed(ret))
146 {
147- task->state= GEARMAN_TASK_STATE_DATA;
148+ task->set_state(GEARMAN_TASK_STATE_DATA);
149 return ret;
150 }
151 }
152@@ -228,7 +229,7 @@
153 gearman_return_t ret= task->func.warning_fn(task->shell());
154 if (gearman_failed(ret))
155 {
156- task->state= GEARMAN_TASK_STATE_WARNING;
157+ task->set_state(GEARMAN_TASK_STATE_WARNING);
158 return ret;
159 }
160 }
161@@ -318,7 +319,7 @@
162 gearman_return_t ret= task->func.status_fn(task->shell());
163 if (gearman_failed(ret))
164 {
165- task->state= GEARMAN_TASK_STATE_STATUS;
166+ task->set_state(GEARMAN_TASK_STATE_STATUS);
167 return ret;
168 }
169 }
170@@ -345,7 +346,7 @@
171 gearman_return_t ret= task->func.complete_fn(task->shell());
172 if (gearman_failed(ret))
173 {
174- task->state= GEARMAN_TASK_STATE_COMPLETE;
175+ task->set_state(GEARMAN_TASK_STATE_COMPLETE);
176 return ret;
177 }
178 }
179@@ -369,7 +370,7 @@
180 gearman_return_t ret= task->func.exception_fn(task->shell());
181 if (gearman_failed(ret))
182 {
183- task->state= GEARMAN_TASK_STATE_EXCEPTION;
184+ task->set_state(GEARMAN_TASK_STATE_EXCEPTION);
185 return ret;
186 }
187 }
188@@ -391,7 +392,7 @@
189 gearman_return_t ret= task->func.fail_fn(task->shell());
190 if (gearman_failed(ret))
191 {
192- task->state= GEARMAN_TASK_STATE_FAIL;
193+ task->set_state(GEARMAN_TASK_STATE_FAIL);
194 return ret;
195 }
196 }
197@@ -399,7 +400,7 @@
198 break;
199 }
200
201- task->state= GEARMAN_TASK_STATE_WORK;
202+ task->set_state(GEARMAN_TASK_STATE_WORK);
203 return GEARMAN_SUCCESS;
204
205 case GEARMAN_TASK_STATE_FINISHED:
206@@ -407,7 +408,10 @@
207 }
208
209 task->client->impl()->running_tasks--;
210- task->state= GEARMAN_TASK_STATE_FINISHED;
211+ task->set_state(GEARMAN_TASK_STATE_FINISHED);
212+
213+ // @todo this should never happen... but background tasks can signal it.
214+ assert(task->result_rc != GEARMAN_UNKNOWN_STATE);
215
216 if (task->client->impl()->options.free_tasks and task->type == GEARMAN_TASK_KIND_ADD_TASK)
217 {
218
219=== modified file 'tests/libgearman-1.0/worker_test.cc'
220--- tests/libgearman-1.0/worker_test.cc 2013-07-06 08:02:19 +0000
221+++ tests/libgearman-1.0/worker_test.cc 2013-07-06 17:04:24 +0000
222@@ -66,6 +66,19 @@
223 #include "tests/workers/v2/call_exception.h"
224 #include "tests/workers/v2/check_order.h"
225
226+#if 0
227+static gearman_return_t exception_fn(gearman_task_st* task)
228+{
229+ Out << "GEARMAN_WORK_EXCEPTION: Task Handle: " << gearman_task_job_handle(task) << " return:" << gearman_strerror(gearman_task_return(task));
230+ return GEARMAN_SUCCESS;
231+}
232+#endif
233+
234+static void error_logger(const char* message, gearman_verbose_t, void*)
235+{
236+ Error << message;
237+}
238+
239 static test_return_t init_test(void *)
240 {
241 gearman_worker_st worker;
242@@ -420,7 +433,6 @@
243 static test_return_t job_order_TEST(void *)
244 {
245 libgearman::Client client(libtest::default_port());;
246- ASSERT_EQ(true, gearman_client_set_server_option(&client, test_literal_param("exceptions")));
247 gearman_client_add_options(&client, GEARMAN_CLIENT_EXCEPTION);
248 gearman_client_add_options(&client, GEARMAN_CLIENT_GENERATE_UNIQUE);
249
250@@ -534,13 +546,16 @@
251 for (std::vector<gearman_task_st*>::iterator iter= tasks.begin();
252 iter != tasks.end(); ++iter)
253 {
254- if (gearman_task_return(*iter) != GEARMAN_UNKNOWN_STATE)
255+
256+ if (gearman_failed(gearman_task_return(*iter)))
257 {
258- Error << ":" << gearman_task_error(*iter) << ":";
259- Error << ":" << gearman_strerror(gearman_task_return(*iter)) << ":";
260+ if (gearman_task_return(*iter) != GEARMAN_UNKNOWN_STATE)
261+ {
262+ Error << "gearman_task_error(" << gearman_task_error(*iter) << ") gearman_task_return(" << gearman_strerror(gearman_task_return(*iter)) << ")";
263+ ASSERT_EQ(GEARMAN_UNKNOWN_STATE, gearman_task_return(*iter));
264+ ASSERT_NULL(gearman_task_error(*iter));
265+ }
266 }
267- ASSERT_EQ(GEARMAN_UNKNOWN_STATE, gearman_task_return(*iter));
268- ASSERT_NULL(gearman_task_error(*iter));
269 }
270
271 for (uint32_t x= 0; x < 10; ++x)
272@@ -759,12 +774,6 @@
273 return TEST_SUCCESS;
274 }
275
276-static gearman_return_t exception_fn(gearman_task_st* task)
277-{
278- Out << "Task Handle: " << gearman_task_job_handle(task);
279- return GEARMAN_SUCCESS;
280-}
281-
282 static test_return_t gearman_job_send_exception_mass_TEST(void *)
283 {
284 gearman_function_t call_exception_WORKER_FN= gearman_function_create(call_exception_WORKER);
285@@ -779,8 +788,14 @@
286 std::vector<gearman_task_st*> tasks;
287 libgearman::Client client(libtest::default_port());
288
289+#if 0
290 gearman_exception_fn *func= exception_fn;
291 gearman_client_set_exception_fn(&client, func);
292+#endif
293+#if 0
294+ ASSERT_EQ(true, gearman_client_set_server_option(&client, test_literal_param("exceptions")));
295+#endif
296+ gearman_client_add_options(&client, GEARMAN_CLIENT_EXCEPTION);
297
298 for (size_t x= 0; x < 100; ++x)
299 {
300@@ -800,33 +815,59 @@
301 tasks.push_back(task);
302 }
303
304- {
305- gearman_return_t ret;
306- do {
307- ret= gearman_client_run_tasks(&client);
308- } while (gearman_continue(ret));
309- }
310-
311- for (std::vector<gearman_task_st*>::iterator iter= tasks.begin();
312- iter != tasks.end(); ++iter)
313- {
314- ASSERT_EQ(GEARMAN_WORK_FAIL, gearman_task_return(*iter));
315+ bool more= true;
316+ while (more)
317+ {
318+ for (std::vector<gearman_task_st*>::iterator iter= tasks.begin();
319+ iter != tasks.end(); ++iter)
320+ {
321+ if (gearman_task_return(*iter) == GEARMAN_UNKNOWN_STATE)
322+ {
323+ {
324+ gearman_return_t ret;
325+ do {
326+ ret= gearman_client_run_tasks(&client);
327+ } while (gearman_continue(ret));
328+
329+ if (gearman_failed(ret))
330+ {
331+ Error << gearman_strerror(ret);
332+ }
333+ ASSERT_EQ(GEARMAN_SUCCESS, ret);
334+ }
335+
336+ continue;
337+ }
338+ else
339+ {
340+ if (gearman_client_has_option(&client, GEARMAN_CLIENT_EXCEPTION))
341+ {
342+ ASSERT_EQ(GEARMAN_WORK_EXCEPTION, gearman_task_return(*iter));
343+ }
344+ else
345+ {
346+ ASSERT_EQ(GEARMAN_WORK_FAIL, gearman_task_return(*iter));
347+ }
348+ }
349+
350+ more= false;
351+ }
352 }
353
354 return TEST_SUCCESS;
355 }
356
357-static void error_logger(const char* message, gearman_verbose_t, void*)
358-{
359- Error << message;
360-}
361-
362 static test_return_t gearman_job_send_exception_TEST(void *)
363 {
364 libgearman::Client client(libtest::default_port());
365 gearman_client_set_log_fn(&client, error_logger, NULL, GEARMAN_VERBOSE_ERROR);
366
367- ASSERT_EQ(true, gearman_client_set_server_option(&client, test_literal_param("exceptions")));
368+#if 0
369+ gearman_exception_fn *func= exception_fn;
370+ gearman_client_set_exception_fn(&client, func);
371+#endif
372+
373+ gearman_client_add_options(&client, GEARMAN_CLIENT_EXCEPTION);
374
375 gearman_function_t exception_WORKER_FN= gearman_function_create(exception_WORKER);
376 std::auto_ptr<worker_handle_st> handle(test_worker_start(libtest::default_port(),
377@@ -848,9 +889,16 @@
378 ret= gearman_client_run_tasks(&client);
379 } while (gearman_continue(ret));
380
381+ if (gearman_client_has_option(&client, GEARMAN_CLIENT_EXCEPTION))
382+ {
383+ ASSERT_EQ(GEARMAN_WORK_EXCEPTION, gearman_task_return(task));
384+ }
385+ else
386+ {
387+ ASSERT_EQ(GEARMAN_WORK_FAIL, gearman_task_return(task));
388+ }
389+ ASSERT_TRUE(gearman_task_has_exception(task));
390 gearman_string_t exception= gearman_task_exception(task);
391- ASSERT_EQ(gearman_task_return(task), GEARMAN_WORK_EXCEPTION);
392- ASSERT_TRUE(gearman_task_has_exception(task));
393 ASSERT_STREQ("dog", gearman_c_str(exception));
394
395 return TEST_SUCCESS;
396@@ -1252,20 +1300,19 @@
397 {
398 libgearman::Client client(libtest::default_port());
399
400- ASSERT_EQ(gearman_client_do_background(&client, function_name, unique_name,
401- test_string_make_from_array(unique_name), NULL),
402- GEARMAN_SUCCESS);
403+ ASSERT_EQ(GEARMAN_SUCCESS,
404+ gearman_client_do_background(&client, function_name, unique_name, test_string_make_from_array(unique_name), NULL));
405 }
406
407 gearman_worker_remove_options(&worker, GEARMAN_WORKER_GRAB_UNIQ);
408- test_false(worker->impl()->options.grab_uniq);
409+ ASSERT_FALSE(worker->impl()->options.grab_uniq);
410
411 gearman_worker_set_timeout(&worker, 800);
412
413 gearman_return_t rc;
414 gearman_job_st *job= gearman_worker_grab_job(&worker, NULL, &rc);
415 ASSERT_EQ(rc, GEARMAN_SUCCESS);
416- test_truth(job);
417+ ASSERT_TRUE(job);
418
419 size_t size= 0;
420 void *result= no_unique_worker(job, NULL, &size, &rc);

Subscribers

People subscribed via source and target branches

to all changes: