Merge lp:~brianaker/gearmand/fixes-for-ssl-builds into lp:gearmand

Proposed by Brian Aker on 2013-07-11
Status: Merged
Merged at revision: 824
Proposed branch: lp:~brianaker/gearmand/fixes-for-ssl-builds
Merge into: lp:gearmand
Diff against target: 1207 lines (+334/-232)
26 files modified
libgearman-1.0/constants.h (+2/-1)
libgearman-server/connection.cc (+13/-10)
libgearman-server/connection.h (+2/-2)
libgearman-server/gearmand.cc (+13/-18)
libgearman-server/gearmand_thread.cc (+4/-10)
libgearman-server/io.cc (+40/-44)
libgearman-server/io.h (+5/-3)
libgearman-server/plugins/protocol/gear/protocol.cc (+26/-3)
libgearman-server/server.cc (+5/-2)
libgearman-server/struct/gearmand_thread.h (+6/-1)
libgearman-server/thread.cc (+5/-8)
libgearman/check.h (+5/-0)
libgearman/client.cc (+26/-30)
libgearman/client.hpp (+3/-3)
libgearman/function/function_v2.cc (+2/-1)
libgearman/interface/universal.hpp (+4/-0)
libgearman/interface/worker.hpp (+5/-0)
libgearman/job.cc (+19/-2)
libgearman/strcommand.h (+8/-0)
libgearman/universal.cc (+7/-0)
libgearman/worker.cc (+87/-71)
libgearman/worker.hpp (+21/-0)
tests/libgearman-1.0/gearman_execute_partition.cc (+8/-8)
tests/protocol.cc (+3/-0)
tests/start_worker.cc (+10/-9)
tests/workers/v2/split.cc (+5/-6)
To merge this branch: bzr merge lp:~brianaker/gearmand/fixes-for-ssl-builds
Reviewer Review Type Date Requested Status
Tangent Trunk 2013-07-11 Pending
Review via email: mp+174109@code.launchpad.net
To post a comment you must log in.
824. By Brian Aker on 2013-07-11

OSX build fixes.

825. By Brian Aker on 2013-07-11

Additional OSX fix.

826. By Brian Aker on 2013-07-11

Fix additional OSX warnings.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'libgearman-1.0/constants.h'
2--- libgearman-1.0/constants.h 2013-06-05 21:59:31 +0000
3+++ libgearman-1.0/constants.h 2013-07-11 21:18:03 +0000
4@@ -133,7 +133,8 @@
5 GEARMAN_WORKER_TIMEOUT_RETURN= (1 << 8),
6 GEARMAN_WORKER_GRAB_ALL= (1 << 9),
7 GEARMAN_WORKER_SSL= (1 << 10),
8- GEARMAN_WORKER_MAX= (1 << 11)
9+ GEARMAN_WORKER_IDENTIFIER= (1 << 11),
10+ GEARMAN_WORKER_MAX= (1 << 12)
11 } gearman_worker_options_t;
12
13 /* Types. */
14
15=== modified file 'libgearman-server/connection.cc'
16--- libgearman-server/connection.cc 2013-06-26 23:50:02 +0000
17+++ libgearman-server/connection.cc 2013-07-11 21:18:03 +0000
18@@ -47,7 +47,8 @@
19
20 #include <string.h>
21 #include <errno.h>
22-#include <assert.h>
23+#include <cassert>
24+#include <algorithm>
25
26 static gearman_server_con_st * _server_con_create(gearman_server_thread_st *thread, gearmand_con_st *dcon,
27 gearmand_error_t& ret);
28@@ -313,16 +314,18 @@
29 return con->id;
30 }
31
32-void gearman_server_con_set_id(gearman_server_con_st *con, char *id,
33- size_t size)
34+void gearman_server_con_set_id(gearman_server_con_st *con,
35+ const char *id,
36+ const size_t size)
37 {
38- if (size >= GEARMAND_SERVER_CON_ID_SIZE)
39- {
40- size= GEARMAND_SERVER_CON_ID_SIZE - 1;
41- }
42-
43- memcpy(con->id, id, size);
44- con->id[size]= 0;
45+ size_t min_size= std::min(size, size_t(GEARMAND_SERVER_CON_ID_SIZE -1));
46+
47+ memcpy(con->id, id, min_size);
48+ con->id[min_size]= 0;
49+
50+ gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
51+ "identifier set to %.*s",
52+ min_size, con->id);
53 }
54
55 void gearman_server_con_free_worker(gearman_server_con_st *con,
56
57=== modified file 'libgearman-server/connection.h'
58--- libgearman-server/connection.h 2013-06-26 23:50:02 +0000
59+++ libgearman-server/connection.h 2013-07-11 21:18:03 +0000
60@@ -111,8 +111,8 @@
61 * Set client id.
62 */
63 GEARMAN_API
64-void gearman_server_con_set_id(gearman_server_con_st *con, char *id,
65- size_t size);
66+void gearman_server_con_set_id(gearman_server_con_st *con, const char *id,
67+ const size_t size);
68
69 /**
70 * Free server worker struction with name for a server connection.
71
72=== modified file 'libgearman-server/gearmand.cc'
73--- libgearman-server/gearmand.cc 2013-07-07 00:16:26 +0000
74+++ libgearman-server/gearmand.cc 2013-07-11 21:18:03 +0000
75@@ -861,15 +861,7 @@
76 Gearmand()->ret= gearmand_perror(local_error, "accept");
77 return;
78 }
79- gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "accept() %d", fd);
80-
81- {
82- int flags= 1;
83- if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags)) == -1)
84- {
85- gearmand_perror(errno, "setsockopt(SO_KEEPALIVE)");
86- }
87- }
88+ gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "accept() fd:%d", fd);
89
90 /*
91 Since this is numeric, it should never fail. Even if it did we don't want to really error from it.
92@@ -887,6 +879,14 @@
93
94 gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Accepted connection from %s:%s", host, port_str);
95
96+ {
97+ int flags= 1;
98+ if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags)) == -1)
99+ {
100+ gearmand_log_perror(GEARMAN_DEFAULT_LOG_PARAM, errno, "%s:%s setsockopt(SO_KEEPALIVE)", host, port_str);
101+ }
102+ }
103+
104 gearmand_error_t ret= gearmand_con_create(Gearmand(), fd, host, port_str, port);
105 if (ret == GEARMAND_MEMORY_ALLOCATION_FAILURE)
106 {
107@@ -915,15 +915,10 @@
108 return gearmand_fatal_perror(errno, "pipe(gearmand->wakeup_fd)");
109 }
110
111- int returned_flags;
112- if ((returned_flags= fcntl(gearmand->wakeup_fd[0], F_GETFL, 0)) == -1)
113- {
114- return gearmand_fatal_perror(errno, "fcntl:F_GETFL");
115- }
116-
117- if (fcntl(gearmand->wakeup_fd[0], F_SETFL, returned_flags | O_NONBLOCK) == -1)
118- {
119- return gearmand_fatal_perror(errno, "fcntl(F_SETFL)");
120+ gearmand_error_t local_ret;
121+ if ((local_ret= gearmand_sockfd_nonblock(gearmand->wakeup_fd[0])))
122+ {
123+ return local_ret;
124 }
125 #endif
126
127
128=== modified file 'libgearman-server/gearmand_thread.cc'
129--- libgearman-server/gearmand_thread.cc 2013-07-10 10:54:09 +0000
130+++ libgearman-server/gearmand_thread.cc 2013-07-11 21:18:03 +0000
131@@ -505,16 +505,10 @@
132 return gearmand_perror(errno, "pipe");
133 }
134
135- int ret= fcntl(thread->wakeup_fd[0], F_GETFL, 0);
136- if (ret == -1)
137- {
138- return gearmand_perror(errno, "fcntl(F_GETFL)");
139- }
140-
141- ret= fcntl(thread->wakeup_fd[0], F_SETFL, ret | O_NONBLOCK);
142- if (ret == -1)
143- {
144- return gearmand_perror(errno, "fcntl(F_SETFL)");
145+ gearmand_error_t local_ret;
146+ if ((local_ret= gearmand_sockfd_nonblock(thread->gearmand().wakeup_fd[0])))
147+ {
148+ return local_ret;
149 }
150 #endif
151
152
153=== modified file 'libgearman-server/io.cc'
154--- libgearman-server/io.cc 2013-07-07 22:07:57 +0000
155+++ libgearman-server/io.cc 2013-07-11 21:18:03 +0000
156@@ -146,10 +146,7 @@
157 if (read_size == 0)
158 {
159 ret= GEARMAND_LOST_CONNECTION;
160- gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,
161- "Peer connection has called close() %s:%s",
162- connection->host(),
163- connection->port());
164+ gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Peer connection has called close()");
165 _connection_close(connection);
166 return 0;
167 }
168@@ -177,10 +174,7 @@
169 case EHOSTDOWN:
170 {
171 ret= GEARMAND_LOST_CONNECTION;
172- gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,
173- "Peer connection has called close() %s:%s",
174- connection->host(),
175- connection->port());
176+ gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Peer connection has called close()");
177 _connection_close(connection);
178 return 0;
179 }
180@@ -300,9 +294,7 @@
181 char errorString[80];
182 CyaSSL_ERR_error_string(err, errorString);
183 _connection_close(connection);
184- return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_LOST_CONNECTION, "%s:%s SSL failure(%s)",
185- connection->host(),
186- connection->port(),
187+ return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_LOST_CONNECTION, "SSL failure(%s)",
188 errorString);
189 }
190 }
191@@ -317,10 +309,8 @@
192 if (write_size == 0) // detect infinite loop?
193 {
194 ++loop_counter;
195- gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "send() sent zero bytes of %u to peer %s:%s",
196- uint32_t(connection->send_buffer_size),
197- connection->host(),
198- connection->port());
199+ gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "send() sent zero bytes of %u",
200+ uint32_t(connection->send_buffer_size));
201
202 if (loop_counter > 5)
203 {
204@@ -366,10 +356,8 @@
205 return GEARMAND_ERRNO;
206 }
207
208- gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "send() %u bytes to peer %s:%s",
209- uint32_t(write_size),
210- connection->host(),
211- connection->port());
212+ gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "send() %u bytes to peer",
213+ uint32_t(write_size));
214
215 connection->send_buffer_size-= static_cast<size_t>(write_size);
216 if (connection->send_state == gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH_DATA)
217@@ -775,9 +763,7 @@
218 }
219 return ret;
220 }
221- gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "%s:%s read %lu bytes",
222- connection->host(),
223- connection->port(),
224+ gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "read %lu bytes",
225 (unsigned long)recv_size);
226
227 connection->recv_buffer_size+= recv_size;
228@@ -901,7 +887,6 @@
229
230 static gearmand_error_t _io_setsockopt(gearmand_io_st &connection)
231 {
232- gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "setsockopt() fd:%d", connection.fd);
233 {
234 int setting= 1;
235 if (setsockopt(connection.fd, IPPROTO_TCP, TCP_NODELAY, &setting, (socklen_t)sizeof(int)) and errno != EOPNOTSUPP)
236@@ -969,28 +954,39 @@
237
238 if (SOCK_NONBLOCK == 0)
239 {
240- int flags;
241+ gearmand_error_t local_ret;
242+ if ((local_ret= gearmand_sockfd_nonblock(connection.fd)))
243+ {
244+ return local_ret;
245+ }
246+ }
247+
248+ return GEARMAND_SUCCESS;
249+}
250+
251+gearmand_error_t gearmand_sockfd_nonblock(const int& sockfd)
252+{
253+ int flags;
254+ do
255+ {
256+ flags= fcntl(sockfd, F_GETFL, 0);
257+ } while (flags == -1 and (errno == EINTR or errno == EAGAIN));
258+
259+ if (flags == -1)
260+ {
261+ return gearmand_perror(errno, "fcntl(F_GETFL)");
262+ }
263+ else if ((flags & O_NONBLOCK) == 0)
264+ {
265+ int retval;
266 do
267 {
268- flags= fcntl(connection.fd, F_GETFL, 0);
269- } while (flags == -1 and (errno == EINTR or errno == EAGAIN));
270-
271- if (flags == -1)
272- {
273- return gearmand_perror(errno, "fcntl(F_GETFL)");
274- }
275- else if ((flags & O_NONBLOCK) == 0)
276- {
277- int retval;
278- do
279- {
280- retval= fcntl(connection.fd, F_SETFL, flags | O_NONBLOCK);
281- } while (retval == -1 and (errno == EINTR or errno == EAGAIN));
282-
283- if (retval == -1)
284- {
285- return gearmand_perror(errno, "fcntl(F_SETFL)");
286- }
287+ retval= fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
288+ } while (retval == -1 and (errno == EINTR or errno == EAGAIN));
289+
290+ if (retval == -1)
291+ {
292+ return gearmand_perror(errno, "fcntl(F_SETFL)");
293 }
294 }
295
296@@ -1016,7 +1012,7 @@
297 }
298 else
299 {
300- gearmand_warning("gearmand_sockfd_close() called with an invalid socket");
301+ gearmand_debug("gearmand_sockfd_close() called with an invalid socket, this was probably ok");
302 }
303 }
304
305
306=== modified file 'libgearman-server/io.h'
307--- libgearman-server/io.h 2013-03-09 22:03:25 +0000
308+++ libgearman-server/io.h 2013-07-11 21:18:03 +0000
309@@ -131,9 +131,11 @@
310 */
311 gearmand_error_t gearmand_io_set_revents(gearman_server_con_st *connection, short revents);
312
313-void gearmand_sockfd_close(int& sockfd);
314-
315-void gearmand_pipe_close(int& sockfd);
316+void gearmand_sockfd_close(int&);
317+
318+void gearmand_pipe_close(int&);
319+
320+gearmand_error_t gearmand_sockfd_nonblock(const int&);
321
322 /** @} */
323
324
325=== modified file 'libgearman-server/plugins/protocol/gear/protocol.cc'
326--- libgearman-server/plugins/protocol/gear/protocol.cc 2013-07-10 10:54:09 +0000
327+++ libgearman-server/plugins/protocol/gear/protocol.cc 2013-07-11 21:18:03 +0000
328@@ -47,6 +47,7 @@
329 #include <libgearman-server/common.h>
330 #include <libgearman/strcommand.h>
331 #include <libgearman-server/packet.h>
332+#include "libgearman/strcommand.h"
333
334 #include <cstdio>
335 #include <cstdlib>
336@@ -103,9 +104,9 @@
337 return false;
338 }
339
340- void notify(gearman_server_con_st *)
341+ void notify(gearman_server_con_st* connection)
342 {
343- gearmand_info("Gear connection disconnected");
344+ gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Gear connection disconnected: %s:%s", connection->host(), connection->port());
345 }
346
347 size_t unpack(gearmand_packet_st *packet,
348@@ -257,6 +258,24 @@
349 int(packet->arg_size[0]),
350 packet->arg[0]);
351 }
352+ else if (packet->command == GEARMAN_COMMAND_WORK_EXCEPTION)
353+ {
354+ gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
355+ "GEAR gearmand_command_t: %s handle: %.*s exception: %.*s",
356+ gearman_strcommand(packet->command),
357+ int(packet->arg_size[0]),
358+ packet->arg[0],
359+ int(packet->data_size),
360+ packet->data);
361+ }
362+ else if (packet->command == GEARMAN_COMMAND_SET_CLIENT_ID)
363+ {
364+ gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
365+ "GEAR gearmand_command_t: %s identifier: %.*s",
366+ gearman_strcommand(packet->command),
367+ int(packet->arg_size[0]),
368+ packet->arg[0]);
369+ }
370 else
371 {
372 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
373@@ -353,9 +372,13 @@
374 cyassl_error_buffer, cyassl_error);
375 }
376 }
377- gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "GearSSL connection made: %d", connection->con.fd);
378+ gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "GearSSL connection made: %s:%s", connection->host(), connection->port());
379 }
380+ else
381 #endif
382+ {
383+ gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Gear connection made: %s:%s", connection->host(), connection->port());
384+ }
385
386 connection->set_protocol(&gear_context);
387
388
389=== modified file 'libgearman-server/server.cc'
390--- libgearman-server/server.cc 2013-07-10 10:54:09 +0000
391+++ libgearman-server/server.cc 2013-07-11 21:18:03 +0000
392@@ -545,7 +545,7 @@
393 long timeout= strtol(strtol_buffer, &endptr, 10);
394 if (timeout == LONG_MIN or timeout == LONG_MAX or errno != 0)
395 {
396- return gearmand_log_perror(GEARMAN_DEFAULT_LOG_PARAM, errno, "GEARMAN_COMMAND_CAN_DO_TIMEOUT:strtol");
397+ return gearmand_log_perror(GEARMAN_DEFAULT_LOG_PARAM, errno, "GEARMAN_COMMAND_CAN_DO_TIMEOUT:strtol: %s", strtol_buffer);
398 }
399
400 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Registering function: %.*s with timeout %dl",
401@@ -1042,7 +1042,10 @@
402 }
403 else if (command == GEARMAN_COMMAND_WORK_EXCEPTION)
404 {
405- gearmand_debug("Sending GEARMAN_COMMAND_WORK_EXCEPTION");
406+ gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
407+ "%s:%s GEARMAN_COMMAND_WORK_EXCEPTION: %.*s",
408+ server_client->con->host(), server_client->con->port(),
409+ int(packet->data_size), packet->data);
410 }
411
412 uint8_t *data;
413
414=== modified file 'libgearman-server/struct/gearmand_thread.h'
415--- libgearman-server/struct/gearmand_thread.h 2013-07-06 10:56:43 +0000
416+++ libgearman-server/struct/gearmand_thread.h 2013-07-11 21:18:03 +0000
417@@ -37,7 +37,7 @@
418
419 #pragma once
420
421-class gearmand_st;
422+struct gearmand_st;
423
424 struct gearmand_thread_st
425 {
426@@ -61,4 +61,9 @@
427 pthread_mutex_t lock;
428
429 gearmand_thread_st(gearmand_st&);
430+
431+ gearmand_st& gearmand()
432+ {
433+ return _gearmand;
434+ }
435 };
436
437=== modified file 'libgearman-server/thread.cc'
438--- libgearman-server/thread.cc 2013-07-06 20:11:54 +0000
439+++ libgearman-server/thread.cc 2013-07-11 21:18:03 +0000
440@@ -45,6 +45,7 @@
441 #include "libgearman-server/common.h"
442
443 #include <libgearman/command.h>
444+#include "libgearman/strcommand.h"
445
446 #ifdef __cplusplus
447 # include <cassert>
448@@ -308,10 +309,8 @@
449 }
450
451 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
452- "Received %s %s:%u",
453- gearmand_strcommand(&con->packet->packet),
454- con->_host == NULL ? "-" : con->_host,
455- con->_port == NULL ? "-" : con->_port);
456+ "Received %s",
457+ gearmand_strcommand(&con->packet->packet));
458
459 /* We read a complete packet. */
460 if (Server->flags.threaded)
461@@ -355,10 +354,8 @@
462 }
463
464 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
465- "Sent %s to %s:%d",
466- gearman_command_info(con->io_packet_list->packet.command)->name,
467- con->_host == NULL ? "-" : con->_host,
468- con->_port == NULL ? "-" : con->_port);
469+ "Sent %s",
470+ gearman_strcommand(con->io_packet_list->packet.command));
471
472 gearman_server_io_packet_remove(con);
473 }
474
475=== modified file 'libgearman/check.h'
476--- libgearman/check.h 2013-07-10 10:54:09 +0000
477+++ libgearman/check.h 2013-07-11 21:18:03 +0000
478@@ -65,6 +65,11 @@
479 {
480 }
481
482+ const gearman_string_t& option() const
483+ {
484+ return _option;
485+ }
486+
487 gearman_return_t success(gearman_connection_st* con);
488
489 private:
490
491=== modified file 'libgearman/client.cc'
492--- libgearman/client.cc 2013-07-10 10:54:09 +0000
493+++ libgearman/client.cc 2013-07-11 21:18:03 +0000
494@@ -1744,36 +1744,32 @@
495
496 gearman_return_t gearman_client_run_block_tasks(Client* client, gearman_task_st* exit_task)
497 {
498- {
499- if (client->task_list == NULL) // We are immediatly successful if all tasks are completed
500- {
501- return GEARMAN_SUCCESS;
502- }
503-
504- gearman_return_t rc;
505- {
506- PUSH_BLOCKING(client->universal);
507-
508- rc= _client_run_tasks(client->shell(), exit_task);
509- }
510-
511- if (gearman_failed(rc))
512- {
513- if (rc == GEARMAN_COULD_NOT_CONNECT)
514- {
515- gearman_reset(client->universal);
516- }
517-
518- if (client->universal.error_code() != rc and rc != GEARMAN_COULD_NOT_CONNECT)
519- {
520- assert(client->universal.error_code() == rc);
521- }
522- }
523-
524- return rc;
525- }
526-
527- return GEARMAN_INVALID_ARGUMENT;
528+ if (client->task_list == NULL) // We are immediatly successful if all tasks are completed
529+ {
530+ return GEARMAN_SUCCESS;
531+ }
532+
533+ gearman_return_t rc;
534+ {
535+ PUSH_BLOCKING(client->universal);
536+
537+ rc= _client_run_tasks(client->shell(), exit_task);
538+ }
539+
540+ if (gearman_failed(rc))
541+ {
542+ if (rc == GEARMAN_COULD_NOT_CONNECT)
543+ {
544+ gearman_reset(client->universal);
545+ }
546+
547+ if (client->universal.error_code() != rc and rc != GEARMAN_COULD_NOT_CONNECT)
548+ {
549+ assert(client->universal.error_code() == rc);
550+ }
551+ }
552+
553+ return rc;
554 }
555
556 /*
557
558=== modified file 'libgearman/client.hpp'
559--- libgearman/client.hpp 2013-07-08 09:58:12 +0000
560+++ libgearman/client.hpp 2013-07-11 21:18:03 +0000
561@@ -47,7 +47,7 @@
562
563 namespace {
564
565-inline void all_PRINTER(const char *line, gearman_verbose_t verbose, void*)
566+inline void all_client_PRINTER(const char *line, gearman_verbose_t verbose, void*)
567 {
568 fprintf(stderr, "%s:%d %s(%s)\n", __FILE__, __LINE__, gearman_verbose_name(verbose), line);
569 }
570@@ -67,7 +67,6 @@
571 throw std::runtime_error("gearman_client_create() failed");
572 }
573 enable_logging();
574-
575 enable_ssl();
576 }
577
578@@ -95,6 +94,7 @@
579
580 gearman_client_add_server(_client, "localhost", arg);
581
582+ enable_logging();
583 enable_ssl();
584 }
585
586@@ -117,7 +117,7 @@
587 {
588 if (getenv("YATL_CLIENT_LOGGING"))
589 {
590- gearman_log_fn *func= all_PRINTER;
591+ gearman_log_fn *func= all_client_PRINTER;
592 gearman_client_set_log_fn(_client, func, NULL, GEARMAN_VERBOSE_ERROR);
593 }
594 }
595
596=== modified file 'libgearman/function/function_v2.cc'
597--- libgearman/function/function_v2.cc 2013-07-05 22:06:45 +0000
598+++ libgearman/function/function_v2.cc 2013-07-11 21:18:03 +0000
599@@ -59,6 +59,7 @@
600 job->_error_code= GEARMAN_SUCCESS;
601 return GEARMAN_FUNCTION_SHUTDOWN;
602
603+ case GEARMAN_WORK_EXCEPTION:
604 case GEARMAN_FATAL:
605 job->_error_code= GEARMAN_FATAL;
606 return GEARMAN_FUNCTION_FATAL;
607@@ -91,7 +92,6 @@
608 case GEARMAN_WORK_DATA:
609 case GEARMAN_WORK_WARNING:
610 case GEARMAN_WORK_STATUS:
611- case GEARMAN_WORK_EXCEPTION:
612 case GEARMAN_NOT_CONNECTED:
613 case GEARMAN_COULD_NOT_CONNECT:
614 case GEARMAN_SEND_IN_PROGRESS:
615@@ -123,5 +123,6 @@
616 break;
617 }
618
619+ gearman_gerror(job->universal(), GEARMAN_INVALID_ARGUMENT);
620 return GEARMAN_FUNCTION_INVALID_ARGUMENT;
621 }
622
623=== modified file 'libgearman/interface/universal.hpp'
624--- libgearman/interface/universal.hpp 2013-07-08 09:58:12 +0000
625+++ libgearman/interface/universal.hpp 2013-07-11 21:18:03 +0000
626@@ -265,6 +265,10 @@
627 ~gearman_universal_st();
628
629 void identifier(const char *identifier_, const size_t identifier_size_);
630+ bool has_identifier() const
631+ {
632+ return _identifier;
633+ }
634 };
635
636 static inline bool gearman_universal_is_non_blocking(gearman_universal_st &self)
637
638=== modified file 'libgearman/interface/worker.hpp'
639--- libgearman/interface/worker.hpp 2013-07-05 22:06:45 +0000
640+++ libgearman/interface/worker.hpp 2013-07-11 21:18:03 +0000
641@@ -127,6 +127,11 @@
642 universal.options._ssl= ssl_;
643 }
644
645+ bool has_identifier() const
646+ {
647+ return universal.has_identifier();
648+ }
649+
650 const char* error() const
651 {
652 return universal.error();
653
654=== modified file 'libgearman/job.cc'
655--- libgearman/job.cc 2013-07-08 00:50:39 +0000
656+++ libgearman/job.cc 2013-07-11 21:18:03 +0000
657@@ -68,6 +68,16 @@
658 gearman_string_append(reducer_function, gearman_string_param(reducer_function_name));
659 }
660
661+ const char* name() const
662+ {
663+ if (reducer_function)
664+ {
665+ return reducer_function->c_str();
666+ }
667+
668+ return "__UNKNOWN";
669+ }
670+
671 ~gearman_job_reducer_st()
672 {
673 gearman_client_free(client);
674@@ -79,7 +89,8 @@
675 client= gearman_client_create(NULL);
676 if (client)
677 {
678-
679+ gearman_universal_clone(client->impl()->universal, universal, false);
680+#if 0
681 if (universal._namespace)
682 {
683 gearman_client_set_namespace(client,
684@@ -94,6 +105,7 @@
685 return false;
686 }
687 }
688+#endif
689
690 return true;
691 }
692@@ -415,7 +427,7 @@
693 gearman_return_t rc= job->reducer->complete();
694 if (gearman_failed(rc))
695 {
696- return gearman_error(job->universal(), rc, "The reducer's complete() returned an error");
697+ return gearman_universal_set_error(job->universal(), rc, GEARMAN_AT, "%s couldn't call complete()", job->reducer->name());
698 }
699
700 const gearman_vector_st *reduced_value= job->reducer->result.string();
701@@ -472,6 +484,11 @@
702 {
703 if (job)
704 {
705+ if (exception == NULL or exception_size == 0)
706+ {
707+ return gearman_error(job->universal(), GEARMAN_INVALID_ARGUMENT, "No exception was provided");
708+ }
709+
710 if (job->finished() == false)
711 {
712 if (job->options.work_in_use == false)
713
714=== modified file 'libgearman/strcommand.h'
715--- libgearman/strcommand.h 2012-03-31 06:09:14 +0000
716+++ libgearman/strcommand.h 2013-07-11 21:18:03 +0000
717@@ -37,4 +37,12 @@
718
719 #pragma once
720
721+#ifdef __cplusplus
722+extern "C" {
723+#endif
724+
725 const char *gearman_strcommand(gearman_command_t command);
726+
727+#ifdef __cplusplus
728+}
729+#endif
730
731=== modified file 'libgearman/universal.cc'
732--- libgearman/universal.cc 2013-07-10 10:54:09 +0000
733+++ libgearman/universal.cc 2013-07-11 21:18:03 +0000
734@@ -105,6 +105,8 @@
735
736 (void)gearman_universal_set_option(destination, GEARMAN_UNIVERSAL_NON_BLOCKING, source.options.non_blocking);
737
738+ destination.ssl(source.ssl());
739+
740 destination.timeout= source.timeout;
741
742 destination._namespace= gearman_string_clone(source._namespace);
743@@ -374,6 +376,11 @@
744 return gearman_gerror(universal, GEARMAN_SHUTDOWN);
745 }
746
747+ if (read_length == -1)
748+ {
749+ gearman_perror(universal, "read() from shutdown pipe");
750+ }
751+
752 #if 0
753 perror("shudown read");
754 #endif
755
756=== modified file 'libgearman/worker.cc'
757--- libgearman/worker.cc 2013-07-08 00:50:39 +0000
758+++ libgearman/worker.cc 2013-07-11 21:18:03 +0000
759@@ -39,6 +39,7 @@
760 #include "gear_config.h"
761
762 #include <libgearman/common.h>
763+#include "libgearman/uuid.hpp"
764 #include <libgearman/function/base.hpp>
765 #include <libgearman/function/make.hpp>
766
767@@ -258,6 +259,8 @@
768 options|= int(GEARMAN_WORKER_TIMEOUT_RETURN);
769 if (worker->ssl())
770 options|= int(GEARMAN_WORKER_SSL);
771+ if (worker->has_identifier())
772+ options|= int(GEARMAN_WORKER_IDENTIFIER);
773
774 return gearman_worker_options_t(options);
775 }
776@@ -276,6 +279,7 @@
777 GEARMAN_WORKER_GRAB_ALL,
778 GEARMAN_WORKER_TIMEOUT_RETURN,
779 GEARMAN_WORKER_SSL,
780+ GEARMAN_WORKER_IDENTIFIER,
781 GEARMAN_WORKER_MAX
782 };
783
784@@ -333,6 +337,14 @@
785 {
786 worker->ssl(true);
787 }
788+
789+ if (options & GEARMAN_WORKER_IDENTIFIER)
790+ {
791+ char uuid_buffer[GEARMAN_MAX_IDENTIFIER];
792+ size_t length= GEARMAN_MAX_IDENTIFIER;
793+ safe_uuid_generate(uuid_buffer, length);
794+ worker->universal.identifier(uuid_buffer, length);
795+ }
796 }
797 }
798
799@@ -369,9 +381,9 @@
800 worker->options.grab_all= false;
801 }
802
803- if (options & GEARMAN_WORKER_SSL)
804+ if (options & GEARMAN_WORKER_IDENTIFIER)
805 {
806- worker->ssl(false);
807+ worker->universal.identifier(NULL, 0);
808 }
809 }
810 }
811@@ -1240,77 +1252,73 @@
812 uint32_t timeout,
813 void *context)
814 {
815+ const void *args[2];
816+ size_t args_size[2];
817+
818+ if (function_length == 0 or function_name == NULL or function_length > GEARMAN_FUNCTION_MAX_SIZE)
819 {
820- const void *args[2];
821- size_t args_size[2];
822-
823- if (function_length == 0 or function_name == NULL or function_length > GEARMAN_FUNCTION_MAX_SIZE)
824- {
825- if (function_length > GEARMAN_FUNCTION_MAX_SIZE)
826- {
827- gearman_error(worker->universal, GEARMAN_INVALID_ARGUMENT, "function name longer then GEARMAN_MAX_FUNCTION_SIZE");
828- }
829- else
830- {
831- gearman_error(worker->universal, GEARMAN_INVALID_ARGUMENT, "invalid function");
832- }
833-
834- return GEARMAN_INVALID_ARGUMENT;
835- }
836-
837- _worker_function_st *function= make(worker->universal._namespace, function_name, function_length, function_arg, context);
838- if (function == NULL)
839- {
840- gearman_perror(worker->universal, "_worker_function_st::new()");
841- return GEARMAN_MEMORY_ALLOCATION_FAILURE;
842- }
843-
844- gearman_return_t ret;
845- if (timeout > 0)
846- {
847- char timeout_buffer[11];
848- snprintf(timeout_buffer, sizeof(timeout_buffer), "%u", timeout);
849- args[0]= function->name();
850- args_size[0]= function->length() + 1;
851- args[1]= timeout_buffer;
852- args_size[1]= strlen(timeout_buffer);
853- ret= gearman_packet_create_args(worker->universal, function->packet(),
854- GEARMAN_MAGIC_REQUEST,
855- GEARMAN_COMMAND_CAN_DO_TIMEOUT,
856- args, args_size, 2);
857- }
858+ if (function_length > GEARMAN_FUNCTION_MAX_SIZE)
859+ {
860+ gearman_error(worker->universal, GEARMAN_INVALID_ARGUMENT, "function name longer then GEARMAN_MAX_FUNCTION_SIZE");
861+ }
862 else
863 {
864- args[0]= function->name();
865- args_size[0]= function->length();
866- ret= gearman_packet_create_args(worker->universal, function->packet(),
867- GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_CAN_DO,
868- args, args_size, 1);
869- }
870-
871- if (gearman_failed(ret))
872- {
873- delete function;
874-
875- return ret;
876- }
877-
878- if (worker->function_list)
879- {
880- worker->function_list->prev= function;
881- }
882-
883- function->next= worker->function_list;
884- function->prev= NULL;
885- worker->function_list= function;
886- worker->function_count++;
887-
888- worker->options.change= true;
889-
890- return GEARMAN_SUCCESS;
891- }
892-
893- return GEARMAN_INVALID_ARGUMENT;
894+ gearman_error(worker->universal, GEARMAN_INVALID_ARGUMENT, "invalid function");
895+ }
896+
897+ return GEARMAN_INVALID_ARGUMENT;
898+ }
899+
900+ _worker_function_st *function= make(worker->universal._namespace, function_name, function_length, function_arg, context);
901+ if (function == NULL)
902+ {
903+ gearman_perror(worker->universal, "_worker_function_st::new()");
904+ return GEARMAN_MEMORY_ALLOCATION_FAILURE;
905+ }
906+
907+ gearman_return_t ret;
908+ if (timeout > 0)
909+ {
910+ char timeout_buffer[11];
911+ snprintf(timeout_buffer, sizeof(timeout_buffer), "%u", timeout);
912+ args[0]= function->name();
913+ args_size[0]= function->length() + 1;
914+ args[1]= timeout_buffer;
915+ args_size[1]= strlen(timeout_buffer);
916+ ret= gearman_packet_create_args(worker->universal, function->packet(),
917+ GEARMAN_MAGIC_REQUEST,
918+ GEARMAN_COMMAND_CAN_DO_TIMEOUT,
919+ args, args_size, 2);
920+ }
921+ else
922+ {
923+ args[0]= function->name();
924+ args_size[0]= function->length();
925+ ret= gearman_packet_create_args(worker->universal, function->packet(),
926+ GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_CAN_DO,
927+ args, args_size, 1);
928+ }
929+
930+ if (gearman_failed(ret))
931+ {
932+ delete function;
933+
934+ return ret;
935+ }
936+
937+ if (worker->function_list)
938+ {
939+ worker->function_list->prev= function;
940+ }
941+
942+ function->next= worker->function_list;
943+ function->prev= NULL;
944+ worker->function_list= function;
945+ worker->function_count++;
946+
947+ worker->options.change= true;
948+
949+ return GEARMAN_SUCCESS;
950 }
951
952 static void _worker_function_free(Worker* worker,
953@@ -1354,8 +1362,16 @@
954 {
955 if (worker_shell and worker_shell->impl())
956 {
957+ Worker* worker= worker_shell->impl();
958 gearman_string_t option= { option_arg, option_arg_size };
959- return gearman_request_option(worker_shell->impl()->universal, option);
960+
961+ if (gearman_success(gearman_server_option(worker->universal, option)))
962+ {
963+ if (gearman_request_option(worker->universal, option))
964+ {
965+ return true;
966+ }
967+ }
968 }
969
970 return false;
971
972=== modified file 'libgearman/worker.hpp'
973--- libgearman/worker.hpp 2013-07-06 00:33:10 +0000
974+++ libgearman/worker.hpp 2013-07-11 21:18:03 +0000
975@@ -43,6 +43,15 @@
976
977 #include <stdexcept>
978
979+namespace {
980+
981+inline void all_worker_PRINTER(const char *line, gearman_verbose_t verbose, void*)
982+{
983+ fprintf(stderr, "%s:%d WORKER %s(%s)\n", __FILE__, __LINE__, gearman_verbose_name(verbose), line);
984+}
985+
986+}
987+
988 namespace org { namespace gearmand { namespace libgearman {
989
990 class Worker {
991@@ -55,6 +64,7 @@
992 {
993 throw std::runtime_error("gearman_worker_create() failed");
994 }
995+ enable_logging();
996
997 enable_ssl();
998 }
999@@ -67,6 +77,8 @@
1000 {
1001 throw std::runtime_error("gearman_worker_create() failed");
1002 }
1003+ enable_logging();
1004+
1005 gearman_worker_add_server(_worker, "localhost", arg);
1006
1007 enable_ssl();
1008@@ -87,6 +99,15 @@
1009 gearman_worker_free(_worker);
1010 }
1011
1012+ void enable_logging()
1013+ {
1014+ if (getenv("YATL_WORKER_LOGGING"))
1015+ {
1016+ gearman_log_fn *func= all_worker_PRINTER;
1017+ gearman_worker_set_log_fn(_worker, func, NULL, GEARMAN_VERBOSE_ERROR);
1018+ }
1019+ }
1020+
1021 void enable_ssl()
1022 {
1023 if (getenv("GEARMAND_CA_CERTIFICATE"))
1024
1025=== modified file 'tests/libgearman-1.0/gearman_execute_partition.cc'
1026--- tests/libgearman-1.0/gearman_execute_partition.cc 2013-07-10 10:54:09 +0000
1027+++ tests/libgearman-1.0/gearman_execute_partition.cc 2013-07-11 21:18:03 +0000
1028@@ -55,7 +55,7 @@
1029 #pragma GCC diagnostic ignored "-Wold-style-cast"
1030 #endif
1031
1032-#define WORKER_FUNCTION_NAME "client_test"
1033+#define WORKER_FUNCTION_NAME "partition_client_test"
1034 #define WORKER_SPLIT_FUNCTION_NAME "split_worker"
1035
1036 test_return_t partition_SETUP(void *object)
1037@@ -105,7 +105,7 @@
1038
1039 // Test client as NULL
1040 gearman_task_st *task= gearman_execute_by_partition(NULL,
1041- test_literal_param("split_worker"),
1042+ test_literal_param(WORKER_SPLIT_FUNCTION_NAME),
1043 test_literal_param(WORKER_FUNCTION_NAME),
1044 NULL, 0, // unique
1045 NULL,
1046@@ -141,7 +141,7 @@
1047 gearman_argument_t workload= gearman_argument_make(0, 0, test_literal_param("this dog does not hunt"));
1048
1049 gearman_task_st *task= gearman_execute_by_partition(client,
1050- test_literal_param("split_worker"),
1051+ test_literal_param(WORKER_SPLIT_FUNCTION_NAME),
1052 test_literal_param(WORKER_FUNCTION_NAME),
1053 NULL, 0, // unique
1054 NULL,
1055@@ -151,7 +151,7 @@
1056 gearman_result_st *result= gearman_task_result(task);
1057 ASSERT_TRUE(result);
1058 const char *value= gearman_result_value(result);
1059- test_truth(value);
1060+ ASSERT_TRUE(value);
1061 ASSERT_EQ(18UL, gearman_result_size(result));
1062
1063 gearman_task_free(task);
1064@@ -171,14 +171,14 @@
1065 gearman_argument_t workload= gearman_argument_make(0, 0, test_literal_param("this dog does not hunt mapper_fail"));
1066
1067 gearman_task_st *task= gearman_execute_by_partition(client,
1068- test_literal_param("split_worker"),
1069+ test_literal_param(WORKER_SPLIT_FUNCTION_NAME),
1070 gearman_string_param_cstr(worker_function),
1071 NULL, 0, // unique
1072 NULL,
1073 &workload, 0);
1074 test_true(task);
1075
1076- ASSERT_EQ(GEARMAN_WORK_FAIL, gearman_task_return(task));
1077+ ASSERT_EQ(GEARMAN_WORK_EXCEPTION, gearman_task_return(task));
1078
1079 gearman_task_free(task);
1080 gearman_client_task_free_all(client);
1081@@ -196,7 +196,7 @@
1082 gearman_argument_t workload= gearman_argument_make(0, 0, test_literal_param("this dog does not hunt fail"));
1083
1084 gearman_task_st *task= gearman_execute_by_partition(client,
1085- test_literal_param("split_worker"),
1086+ test_literal_param(WORKER_SPLIT_FUNCTION_NAME),
1087 gearman_string_param_cstr(worker_function),
1088 NULL, 0, // unique
1089 NULL,
1090@@ -224,7 +224,7 @@
1091 gearman_argument_t workload= gearman_argument_make(0, 0, test_literal_param("this dog does not hunt"));
1092
1093 gearman_task_st *task= gearman_execute(client,
1094- test_literal_param("split_worker"),
1095+ test_literal_param(WORKER_SPLIT_FUNCTION_NAME),
1096 NULL, 0, // unique
1097 NULL,
1098 &workload, 0);
1099
1100=== modified file 'tests/protocol.cc'
1101--- tests/protocol.cc 2013-06-27 22:23:44 +0000
1102+++ tests/protocol.cc 2013-07-11 21:18:03 +0000
1103@@ -70,6 +70,7 @@
1104
1105 gearman_universal_st universal;
1106 gearman_set_log_fn(universal, error_logger, NULL, GEARMAN_VERBOSE_ERROR);
1107+ universal.ssl(libtest::is_ssl());
1108
1109 gearman_packet_st message;
1110
1111@@ -111,6 +112,7 @@
1112
1113 gearman_universal_st universal;
1114 gearman_set_log_fn(universal, error_logger, NULL, GEARMAN_VERBOSE_ERROR);
1115+ universal.ssl(libtest::is_ssl());
1116
1117 gearman_packet_st message;
1118
1119@@ -152,6 +154,7 @@
1120
1121 gearman_universal_st universal;
1122 gearman_set_log_fn(universal, error_logger, NULL, GEARMAN_VERBOSE_ERROR);
1123+ universal.ssl(libtest::is_ssl());
1124
1125 gearman_packet_st message;
1126
1127
1128=== modified file 'tests/start_worker.cc'
1129--- tests/start_worker.cc 2013-05-07 09:50:42 +0000
1130+++ tests/start_worker.cc 2013-07-11 21:18:03 +0000
1131@@ -165,16 +165,9 @@
1132 gearman_worker_set_namespace(&worker, context->namespace_key.c_str(), context->namespace_key.length());
1133 }
1134
1135- // Check for a working server by "asking" it for an option
1136+ // Set worker id
1137 {
1138- size_t count= 5;
1139- bool success= false;
1140- while (--count and success == false)
1141- {
1142- success= gearman_worker_set_server_option(&worker, test_literal_param("exceptions"));
1143- }
1144-
1145- if (success == false)
1146+ if (gearman_failed(gearman_worker_set_identifier(&worker, gearman_literal_param("start_worker"))))
1147 {
1148 Out << "gearman_worker_set_server_option() failed";
1149 context->fail();
1150@@ -182,6 +175,14 @@
1151 }
1152 }
1153
1154+ // Check for a working server by pinging it with echo
1155+ if (gearman_failed(gearman_worker_echo(&worker, gearman_literal_param("start_worker"))))
1156+ {
1157+ Out << "gearman_worker_set_server_option() failed";
1158+ context->fail();
1159+ return;
1160+ }
1161+
1162 if (gearman_failed(gearman_worker_define_function(&worker,
1163 context->function_name.c_str(), context->function_name.length(),
1164 context->worker_fn,
1165
1166=== modified file 'tests/workers/v2/split.cc'
1167--- tests/workers/v2/split.cc 2012-10-29 20:00:10 +0000
1168+++ tests/workers/v2/split.cc 2013-07-11 21:18:03 +0000
1169@@ -62,16 +62,16 @@
1170 {
1171 if (int(workload[x]) == 0 or int(workload[x]) == int(' '))
1172 {
1173- if ((workload +x -chunk_begin) == 11 and not memcmp(chunk_begin, test_literal_param("mapper_fail")))
1174+ if ((workload +x -chunk_begin) == 11 and memcmp(chunk_begin, test_literal_param("mapper_fail") == 0))
1175 {
1176- return GEARMAN_FAIL;
1177+ return gearman_job_send_exception(job, test_literal_param("Error occured on purpose"));
1178 }
1179
1180 // NULL Chunk
1181 gearman_return_t rc= gearman_job_send_data(job, chunk_begin, workload +x -chunk_begin);
1182 if (gearman_failed(rc))
1183 {
1184- return GEARMAN_FAIL;
1185+ return gearman_job_send_exception(job, test_literal_param("gearman_job_send_data() failed"));
1186 }
1187
1188 chunk_begin= workload +x +1;
1189@@ -82,16 +82,15 @@
1190 {
1191 if ((size_t(workload +workload_size) -size_t(chunk_begin) ) == 11 and not memcmp(chunk_begin, test_literal_param("mapper_fail")))
1192 {
1193- return GEARMAN_FAIL;
1194+ return gearman_job_send_exception(job, test_literal_param("Error occured on purpose"));
1195 }
1196
1197 gearman_return_t rc= gearman_job_send_data(job, chunk_begin, size_t(workload +workload_size) -size_t(chunk_begin));
1198 if (gearman_failed(rc))
1199 {
1200- return GEARMAN_FAIL;
1201+ return gearman_job_send_exception(job, test_literal_param("gearman_job_send_data() failed"));
1202 }
1203 }
1204
1205 return GEARMAN_SUCCESS;
1206 }
1207-

Subscribers

People subscribed via source and target branches

to all changes: