Merge lp:~brianaker/gearmand/ssl-wait into lp:gearmand

Proposed by Brian Aker on 2013-07-06
Status: Merged
Merged at revision: 810
Proposed branch: lp:~brianaker/gearmand/ssl-wait
Merge into: lp:gearmand
Diff against target: 429 lines (+128/-54) (has conflicts)
6 files modified
libgearman-server/plugins/protocol/gear/protocol.cc (+9/-0)
libgearman-server/thread.cc (+1/-1)
libgearman/connection.cc (+8/-3)
libgearman/interface/task.hpp (+5/-0)
libgearman/run.cc (+22/-14)
tests/libgearman-1.0/worker_test.cc (+83/-36)
Text conflict in libgearman/connection.cc
To merge this branch: bzr merge lp:~brianaker/gearmand/ssl-wait
Reviewer Review Type Date Requested Status
Tangent Trunk 2013-07-06 Pending
Review via email: mp+173302@code.launchpad.net
To post a comment you must log in.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'libgearman-server/plugins/protocol/gear/protocol.cc'
2--- libgearman-server/plugins/protocol/gear/protocol.cc 2013-07-06 00:33:10 +0000
3+++ libgearman-server/plugins/protocol/gear/protocol.cc 2013-07-06 09:21:25 +0000
4@@ -248,6 +248,15 @@
5 int(packet->data_size),
6 packet->data);
7 }
8+ else if (packet->command == GEARMAN_COMMAND_OPTION_REQ)
9+ {
10+ gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
11+ "GEAR length: %" PRIu64 " gearmand_command_t: %s option: %.*s",
12+ uint64_t(packet->data_size),
13+ gearman_strcommand(packet->command),
14+ int(packet->data_size),
15+ packet->data);
16+ }
17 else
18 {
19 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
20
21=== modified file 'libgearman-server/thread.cc'
22--- libgearman-server/thread.cc 2013-05-05 03:23:55 +0000
23+++ libgearman-server/thread.cc 2013-07-06 09:21:25 +0000
24@@ -309,7 +309,7 @@
25
26 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
27 "Received %s %s:%u",
28- gearman_command_info(con->packet->packet.command)->name,
29+ gearmand_strcommand(&con->packet->packet),
30 con->_host == NULL ? "-" : con->_host,
31 con->_port == NULL ? "-" : con->_port);
32
33
34=== modified file 'libgearman/connection.cc'
35--- libgearman/connection.cc 2013-07-06 00:33:10 +0000
36+++ libgearman/connection.cc 2013-07-06 09:21:25 +0000
37@@ -75,8 +75,8 @@
38 # define FD_CLOEXEC 0
39 #endif
40
41-#ifndef MSG_DONTWAIT
42-# define MSG_DONTWAIT 0
43+#ifndef MSG_NOSIGNAL
44+# define MSG_NOSIGNAL 0
45 #endif
46
47
48@@ -1105,8 +1105,13 @@
49 #if defined(HAVE_CYASSL) && HAVE_CYASSL
50 if (_ssl)
51 {
52+<<<<<<< TREE
53 read_size= CyaSSL_recv(_ssl, data, data_size, MSG_DONTWAIT);
54 if (read_size <= 0)
55+=======
56+ read_size= CyaSSL_recv(_ssl, data, data_size, MSG_NOSIGNAL);
57+ if (read_size < 0)
58+>>>>>>> MERGE-SOURCE
59 {
60 int sendErr= CyaSSL_get_error(_ssl, read_size);
61 if (sendErr != SSL_ERROR_WANT_READ)
62@@ -1226,7 +1231,7 @@
63 int rval;
64 do
65 {
66- rval= fcntl (fd, F_SETFD, flags | FD_CLOEXEC);
67+ rval= fcntl(fd, F_SETFD, flags | FD_CLOEXEC);
68 } while (rval == -1 && (errno == EINTR or errno == EAGAIN));
69
70 if (rval == -1)
71
72=== modified file 'libgearman/interface/task.hpp'
73--- libgearman/interface/task.hpp 2013-05-07 09:45:37 +0000
74+++ libgearman/interface/task.hpp 2013-07-06 09:21:25 +0000
75@@ -154,6 +154,11 @@
76
77 bool create_result(size_t initial_size);
78
79+ void set_state(const enum gearman_task_state_t state_)
80+ {
81+ state= state_;
82+ }
83+
84 private:
85 gearman_task_st* _shell;
86 gearman_task_st _owned_shell;
87
88=== modified file 'libgearman/run.cc'
89--- libgearman/run.cc 2013-06-27 02:11:30 +0000
90+++ libgearman/run.cc 2013-07-06 09:21:25 +0000
91@@ -103,7 +103,7 @@
92 }
93 else if (ret == GEARMAN_IO_WAIT)
94 {
95- task->state= GEARMAN_TASK_STATE_SUBMIT;
96+ task->set_state(GEARMAN_TASK_STATE_SUBMIT);
97 return ret;
98 }
99 else if (gearman_failed(ret))
100@@ -134,12 +134,12 @@
101
102 if (ret == GEARMAN_COULD_NOT_CONNECT) // If no connection is found, we will let the user try again
103 {
104- task->state= GEARMAN_TASK_STATE_NEW;
105+ task->set_state(GEARMAN_TASK_STATE_NEW);
106 task->client->impl()->new_tasks++;
107 }
108 else
109 {
110- task->state= GEARMAN_TASK_STATE_FAIL;
111+ task->set_state(GEARMAN_TASK_STATE_FAIL);
112 task->client->impl()->running_tasks--;
113 }
114 return ret;
115@@ -166,13 +166,13 @@
116 gearman_return_t ret= task->func.workload_fn(task->shell());
117 if (gearman_failed(ret))
118 {
119- task->state= GEARMAN_TASK_STATE_WORKLOAD;
120+ task->set_state(GEARMAN_TASK_STATE_WORKLOAD);
121 return ret;
122 }
123 }
124
125 task->client->impl()->options.no_new= false;
126- task->state= GEARMAN_TASK_STATE_WORK;
127+ task->set_state(GEARMAN_TASK_STATE_WORK);
128 task->con->set_events(POLLIN);
129 return GEARMAN_SUCCESS;
130
131@@ -190,7 +190,7 @@
132 gearman_return_t ret= task->func.created_fn(task->shell());
133 if (gearman_failed(ret))
134 {
135- task->state= GEARMAN_TASK_STATE_CREATED;
136+ task->set_state(GEARMAN_TASK_STATE_CREATED);
137 return ret;
138 }
139 }
140@@ -201,6 +201,7 @@
141 task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_EPOCH ||
142 task->send.command == GEARMAN_COMMAND_SUBMIT_REDUCE_JOB_BACKGROUND)
143 {
144+ task->result_rc= GEARMAN_SUCCESS;
145 break;
146 }
147 }
148@@ -215,7 +216,7 @@
149 gearman_return_t ret= task->func.data_fn(task->shell());
150 if (gearman_failed(ret))
151 {
152- task->state= GEARMAN_TASK_STATE_DATA;
153+ task->set_state(GEARMAN_TASK_STATE_DATA);
154 return ret;
155 }
156 }
157@@ -228,7 +229,7 @@
158 gearman_return_t ret= task->func.warning_fn(task->shell());
159 if (gearman_failed(ret))
160 {
161- task->state= GEARMAN_TASK_STATE_WARNING;
162+ task->set_state(GEARMAN_TASK_STATE_WARNING);
163 return ret;
164 }
165 }
166@@ -318,7 +319,7 @@
167 gearman_return_t ret= task->func.status_fn(task->shell());
168 if (gearman_failed(ret))
169 {
170- task->state= GEARMAN_TASK_STATE_STATUS;
171+ task->set_state(GEARMAN_TASK_STATE_STATUS);
172 return ret;
173 }
174 }
175@@ -345,7 +346,7 @@
176 gearman_return_t ret= task->func.complete_fn(task->shell());
177 if (gearman_failed(ret))
178 {
179- task->state= GEARMAN_TASK_STATE_COMPLETE;
180+ task->set_state(GEARMAN_TASK_STATE_COMPLETE);
181 return ret;
182 }
183 }
184@@ -369,7 +370,7 @@
185 gearman_return_t ret= task->func.exception_fn(task->shell());
186 if (gearman_failed(ret))
187 {
188- task->state= GEARMAN_TASK_STATE_EXCEPTION;
189+ task->set_state(GEARMAN_TASK_STATE_EXCEPTION);
190 return ret;
191 }
192 }
193@@ -391,7 +392,7 @@
194 gearman_return_t ret= task->func.fail_fn(task->shell());
195 if (gearman_failed(ret))
196 {
197- task->state= GEARMAN_TASK_STATE_FAIL;
198+ task->set_state(GEARMAN_TASK_STATE_FAIL);
199 return ret;
200 }
201 }
202@@ -399,7 +400,7 @@
203 break;
204 }
205
206- task->state= GEARMAN_TASK_STATE_WORK;
207+ task->set_state(GEARMAN_TASK_STATE_WORK);
208 return GEARMAN_SUCCESS;
209
210 case GEARMAN_TASK_STATE_FINISHED:
211@@ -407,7 +408,14 @@
212 }
213
214 task->client->impl()->running_tasks--;
215- task->state= GEARMAN_TASK_STATE_FINISHED;
216+ task->set_state(GEARMAN_TASK_STATE_FINISHED);
217+
218+ // @todo this should never happen... but background tasks can signal it.
219+ if (task->result_rc == GEARMAN_UNKNOWN_STATE and task->client->impl()->universal.error())
220+ {
221+ fprintf(stderr, "%s:%u %s\n", __FILE__, __LINE__, task->client->impl()->universal.error());
222+ }
223+ assert(task->result_rc != GEARMAN_UNKNOWN_STATE);
224
225 if (task->client->impl()->options.free_tasks and task->type == GEARMAN_TASK_KIND_ADD_TASK)
226 {
227
228=== modified file 'tests/libgearman-1.0/worker_test.cc'
229--- tests/libgearman-1.0/worker_test.cc 2013-07-06 08:02:19 +0000
230+++ tests/libgearman-1.0/worker_test.cc 2013-07-06 09:21:25 +0000
231@@ -66,6 +66,19 @@
232 #include "tests/workers/v2/call_exception.h"
233 #include "tests/workers/v2/check_order.h"
234
235+#if 0
236+static gearman_return_t exception_fn(gearman_task_st* task)
237+{
238+ Out << "GEARMAN_WORK_EXCEPTION: Task Handle: " << gearman_task_job_handle(task) << " return:" << gearman_strerror(gearman_task_return(task));
239+ return GEARMAN_SUCCESS;
240+}
241+#endif
242+
243+static void error_logger(const char* message, gearman_verbose_t, void*)
244+{
245+ Error << message;
246+}
247+
248 static test_return_t init_test(void *)
249 {
250 gearman_worker_st worker;
251@@ -420,7 +433,6 @@
252 static test_return_t job_order_TEST(void *)
253 {
254 libgearman::Client client(libtest::default_port());;
255- ASSERT_EQ(true, gearman_client_set_server_option(&client, test_literal_param("exceptions")));
256 gearman_client_add_options(&client, GEARMAN_CLIENT_EXCEPTION);
257 gearman_client_add_options(&client, GEARMAN_CLIENT_GENERATE_UNIQUE);
258
259@@ -534,13 +546,16 @@
260 for (std::vector<gearman_task_st*>::iterator iter= tasks.begin();
261 iter != tasks.end(); ++iter)
262 {
263- if (gearman_task_return(*iter) != GEARMAN_UNKNOWN_STATE)
264+
265+ if (gearman_failed(gearman_task_return(*iter)))
266 {
267- Error << ":" << gearman_task_error(*iter) << ":";
268- Error << ":" << gearman_strerror(gearman_task_return(*iter)) << ":";
269+ if (gearman_task_return(*iter) != GEARMAN_UNKNOWN_STATE)
270+ {
271+ Error << "gearman_task_error(" << gearman_task_error(*iter) << ") gearman_task_return(" << gearman_strerror(gearman_task_return(*iter)) << ")";
272+ ASSERT_EQ(GEARMAN_UNKNOWN_STATE, gearman_task_return(*iter));
273+ ASSERT_NULL(gearman_task_error(*iter));
274+ }
275 }
276- ASSERT_EQ(GEARMAN_UNKNOWN_STATE, gearman_task_return(*iter));
277- ASSERT_NULL(gearman_task_error(*iter));
278 }
279
280 for (uint32_t x= 0; x < 10; ++x)
281@@ -759,12 +774,6 @@
282 return TEST_SUCCESS;
283 }
284
285-static gearman_return_t exception_fn(gearman_task_st* task)
286-{
287- Out << "Task Handle: " << gearman_task_job_handle(task);
288- return GEARMAN_SUCCESS;
289-}
290-
291 static test_return_t gearman_job_send_exception_mass_TEST(void *)
292 {
293 gearman_function_t call_exception_WORKER_FN= gearman_function_create(call_exception_WORKER);
294@@ -779,8 +788,14 @@
295 std::vector<gearman_task_st*> tasks;
296 libgearman::Client client(libtest::default_port());
297
298+#if 0
299 gearman_exception_fn *func= exception_fn;
300 gearman_client_set_exception_fn(&client, func);
301+#endif
302+#if 0
303+ ASSERT_EQ(true, gearman_client_set_server_option(&client, test_literal_param("exceptions")));
304+#endif
305+ gearman_client_add_options(&client, GEARMAN_CLIENT_EXCEPTION);
306
307 for (size_t x= 0; x < 100; ++x)
308 {
309@@ -800,33 +815,59 @@
310 tasks.push_back(task);
311 }
312
313- {
314- gearman_return_t ret;
315- do {
316- ret= gearman_client_run_tasks(&client);
317- } while (gearman_continue(ret));
318- }
319-
320- for (std::vector<gearman_task_st*>::iterator iter= tasks.begin();
321- iter != tasks.end(); ++iter)
322- {
323- ASSERT_EQ(GEARMAN_WORK_FAIL, gearman_task_return(*iter));
324+ bool more= true;
325+ while (more)
326+ {
327+ for (std::vector<gearman_task_st*>::iterator iter= tasks.begin();
328+ iter != tasks.end(); ++iter)
329+ {
330+ if (gearman_task_return(*iter) == GEARMAN_UNKNOWN_STATE)
331+ {
332+ {
333+ gearman_return_t ret;
334+ do {
335+ ret= gearman_client_run_tasks(&client);
336+ } while (gearman_continue(ret));
337+
338+ if (gearman_failed(ret))
339+ {
340+ Error << gearman_strerror(ret);
341+ }
342+ ASSERT_EQ(GEARMAN_SUCCESS, ret);
343+ }
344+
345+ continue;
346+ }
347+ else
348+ {
349+ if (gearman_client_has_option(&client, GEARMAN_CLIENT_EXCEPTION))
350+ {
351+ ASSERT_EQ(GEARMAN_WORK_EXCEPTION, gearman_task_return(*iter));
352+ }
353+ else
354+ {
355+ ASSERT_EQ(GEARMAN_WORK_FAIL, gearman_task_return(*iter));
356+ }
357+ }
358+
359+ more= false;
360+ }
361 }
362
363 return TEST_SUCCESS;
364 }
365
366-static void error_logger(const char* message, gearman_verbose_t, void*)
367-{
368- Error << message;
369-}
370-
371 static test_return_t gearman_job_send_exception_TEST(void *)
372 {
373 libgearman::Client client(libtest::default_port());
374 gearman_client_set_log_fn(&client, error_logger, NULL, GEARMAN_VERBOSE_ERROR);
375
376- ASSERT_EQ(true, gearman_client_set_server_option(&client, test_literal_param("exceptions")));
377+#if 0
378+ gearman_exception_fn *func= exception_fn;
379+ gearman_client_set_exception_fn(&client, func);
380+#endif
381+
382+ gearman_client_add_options(&client, GEARMAN_CLIENT_EXCEPTION);
383
384 gearman_function_t exception_WORKER_FN= gearman_function_create(exception_WORKER);
385 std::auto_ptr<worker_handle_st> handle(test_worker_start(libtest::default_port(),
386@@ -848,9 +889,16 @@
387 ret= gearman_client_run_tasks(&client);
388 } while (gearman_continue(ret));
389
390+ if (gearman_client_has_option(&client, GEARMAN_CLIENT_EXCEPTION))
391+ {
392+ ASSERT_EQ(GEARMAN_WORK_EXCEPTION, gearman_task_return(task));
393+ }
394+ else
395+ {
396+ ASSERT_EQ(GEARMAN_WORK_FAIL, gearman_task_return(task));
397+ }
398+ ASSERT_TRUE(gearman_task_has_exception(task));
399 gearman_string_t exception= gearman_task_exception(task);
400- ASSERT_EQ(gearman_task_return(task), GEARMAN_WORK_EXCEPTION);
401- ASSERT_TRUE(gearman_task_has_exception(task));
402 ASSERT_STREQ("dog", gearman_c_str(exception));
403
404 return TEST_SUCCESS;
405@@ -1252,20 +1300,19 @@
406 {
407 libgearman::Client client(libtest::default_port());
408
409- ASSERT_EQ(gearman_client_do_background(&client, function_name, unique_name,
410- test_string_make_from_array(unique_name), NULL),
411- GEARMAN_SUCCESS);
412+ ASSERT_EQ(GEARMAN_SUCCESS,
413+ gearman_client_do_background(&client, function_name, unique_name, test_string_make_from_array(unique_name), NULL));
414 }
415
416 gearman_worker_remove_options(&worker, GEARMAN_WORKER_GRAB_UNIQ);
417- test_false(worker->impl()->options.grab_uniq);
418+ ASSERT_FALSE(worker->impl()->options.grab_uniq);
419
420 gearman_worker_set_timeout(&worker, 800);
421
422 gearman_return_t rc;
423 gearman_job_st *job= gearman_worker_grab_job(&worker, NULL, &rc);
424 ASSERT_EQ(rc, GEARMAN_SUCCESS);
425- test_truth(job);
426+ ASSERT_TRUE(job);
427
428 size_t size= 0;
429 void *result= no_unique_worker(job, NULL, &size, &rc);

Subscribers

People subscribed via source and target branches

to all changes: