Merge lp:~brianaker/gearmand/ref-client-fix into lp:gearmand

Proposed by Brian Aker
Status: Merged
Merged at revision: 811
Proposed branch: lp:~brianaker/gearmand/ref-client-fix
Merge into: lp:gearmand
Diff against target: 1247 lines (+290/-251) (has conflicts)
7 files modified
libgearman/actions.cc (+3/-3)
libgearman/add.cc (+24/-25)
libgearman/client.cc (+210/-199)
libgearman/interface/task.hpp (+3/-3)
libgearman/run.cc (+37/-13)
libgearman/task.cc (+11/-8)
libgearman/task.hpp (+2/-0)
Text conflict in libgearman/run.cc
To merge this branch: bzr merge lp:~brianaker/gearmand/ref-client-fix
Reviewer Review Type Date Requested Status
Tangent Trunk Pending
Review via email: mp+173325@code.launchpad.net
To post a comment you must log in.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'libgearman/actions.cc'
--- libgearman/actions.cc 2013-07-06 08:02:19 +0000
+++ libgearman/actions.cc 2013-07-06 19:18:28 +0000
@@ -68,7 +68,7 @@
68 {68 {
69 if (task->create_result(gearman_task_data_size(shell)) == false)69 if (task->create_result(gearman_task_data_size(shell)) == false)
70 {70 {
71 return gearman_error(task->client->impl()->universal, GEARMAN_MEMORY_ALLOCATION_FAILURE, "Failed to create result_st");71 return gearman_error(task->client->universal, GEARMAN_MEMORY_ALLOCATION_FAILURE, "Failed to create result_st");
72 }72 }
73 }73 }
74 assert_msg(task->result(), "programmer error, result_ptr has not been allocated for task");74 assert_msg(task->result(), "programmer error, result_ptr has not been allocated for task");
@@ -137,7 +137,7 @@
137 {137 {
138 if (task->create_result(gearman_task_data_size(shell)) == false)138 if (task->create_result(gearman_task_data_size(shell)) == false)
139 {139 {
140 return gearman_error(task->client->impl()->universal, GEARMAN_MEMORY_ALLOCATION_FAILURE, "Failed to create result_st");140 return gearman_error(task->client->universal, GEARMAN_MEMORY_ALLOCATION_FAILURE, "Failed to create result_st");
141 }141 }
142 }142 }
143143
@@ -158,7 +158,7 @@
158 {158 {
159 if (task->create_result(gearman_task_data_size(shell)) == false)159 if (task->create_result(gearman_task_data_size(shell)) == false)
160 {160 {
161 return gearman_error(task->client->impl()->universal, GEARMAN_MEMORY_ALLOCATION_FAILURE, "Failed to create result_st");161 return gearman_error(task->client->universal, GEARMAN_MEMORY_ALLOCATION_FAILURE, "Failed to create result_st");
162 }162 }
163 }163 }
164164
165165
=== modified file 'libgearman/add.cc'
--- libgearman/add.cc 2013-07-03 09:00:14 +0000
+++ libgearman/add.cc 2013-07-06 19:18:28 +0000
@@ -202,7 +202,6 @@
202 return NULL;202 return NULL;
203 }203 }
204 assert(task_shell->impl()->client);204 assert(task_shell->impl()->client);
205 assert(task_shell->impl()->client == &client);
206205
207 Task* task= task_shell->impl();206 Task* task= task_shell->impl();
208207
@@ -229,7 +228,7 @@
229 {228 {
230 if (safe_uuid_generate(task->unique, task->unique_length) == -1)229 if (safe_uuid_generate(task->unique, task->unique_length) == -1)
231 {230 {
232 gearman_log_debug(task->client->impl()->universal, "uuid_generate_time_safe() failed or does not exist on this platform");231 gearman_log_debug(task->client->universal, "uuid_generate_time_safe() failed or does not exist on this platform");
233 }232 }
234 }233 }
235 else234 else
@@ -242,7 +241,6 @@
242 gearman_unique_t final_unique= gearman_unique_make(task->unique, task->unique_length);241 gearman_unique_t final_unique= gearman_unique_make(task->unique, task->unique_length);
243242
244 assert(task->client);243 assert(task->client);
245 assert(task->client == &client);
246244
247 gearman_return_t rc= GEARMAN_INVALID_ARGUMENT;245 gearman_return_t rc= GEARMAN_INVALID_ARGUMENT;
248 switch (command)246 switch (command)
@@ -250,7 +248,7 @@
250 case GEARMAN_COMMAND_SUBMIT_JOB:248 case GEARMAN_COMMAND_SUBMIT_JOB:
251 case GEARMAN_COMMAND_SUBMIT_JOB_LOW:249 case GEARMAN_COMMAND_SUBMIT_JOB_LOW:
252 case GEARMAN_COMMAND_SUBMIT_JOB_HIGH:250 case GEARMAN_COMMAND_SUBMIT_JOB_HIGH:
253 rc= libgearman::protocol::submit(task->client->impl()->universal,251 rc= libgearman::protocol::submit(task->client->universal,
254 task->send,252 task->send,
255 final_unique,253 final_unique,
256 command,254 command,
@@ -259,7 +257,7 @@
259 break;257 break;
260258
261 case GEARMAN_COMMAND_SUBMIT_JOB_EPOCH:259 case GEARMAN_COMMAND_SUBMIT_JOB_EPOCH:
262 rc= libgearman::protocol::submit_epoch(task->client->impl()->universal,260 rc= libgearman::protocol::submit_epoch(task->client->universal,
263 task->send,261 task->send,
264 final_unique,262 final_unique,
265 function,263 function,
@@ -270,7 +268,7 @@
270 case GEARMAN_COMMAND_SUBMIT_JOB_BG:268 case GEARMAN_COMMAND_SUBMIT_JOB_BG:
271 case GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG:269 case GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG:
272 case GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG:270 case GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG:
273 rc= libgearman::protocol::submit_background(task->client->impl()->universal,271 rc= libgearman::protocol::submit_background(task->client->universal,
274 task->send,272 task->send,
275 final_unique,273 final_unique,
276 command,274 command,
@@ -379,15 +377,16 @@
379 return NULL;377 return NULL;
380 }378 }
381379
382 gearman_task_st *task= gearman_task_internal_create(*client, NULL);380 gearman_task_st *task_shell= gearman_task_internal_create(*client, NULL);
383 if (task == NULL)381 if (task_shell == NULL)
384 {382 {
385 assert(client->impl()->universal.error_code());383 assert(client->impl()->universal.error_code());
386 return NULL;384 return NULL;
387 }385 }
388386
389 task->impl()->context= context;387 Task* task= task_shell->impl();
390 task->impl()->func= actions;388 task->context= context;
389 task->func= actions;
391390
392 /**391 /**
393 @todo fix it so that NULL is done by default by the API not by happenstance.392 @todo fix it so that NULL is done by default by the API not by happenstance.
@@ -413,33 +412,33 @@
413412
414 if (gearman_unique_is_hash(unique))413 if (gearman_unique_is_hash(unique))
415 {414 {
416 task->impl()->unique_length= snprintf(task->impl()->unique, GEARMAN_MAX_UNIQUE_SIZE, "%u", libhashkit_murmur3(gearman_string_param(workload)));415 task->unique_length= snprintf(task->unique, GEARMAN_MAX_UNIQUE_SIZE, "%u", libhashkit_murmur3(gearman_string_param(workload)));
417 }416 }
418 else if ((task->impl()->unique_length= gearman_size(unique)))417 else if ((task->unique_length= gearman_size(unique)))
419 {418 {
420 if (task->impl()->unique_length >= GEARMAN_MAX_UNIQUE_SIZE)419 if (task->unique_length >= GEARMAN_MAX_UNIQUE_SIZE)
421 {420 {
422 task->impl()->unique_length= GEARMAN_MAX_UNIQUE_SIZE -1; // Leave space for NULL byte421 task->unique_length= GEARMAN_MAX_UNIQUE_SIZE -1; // Leave space for NULL byte
423 }422 }
424423
425 strncpy(task->impl()->unique, gearman_c_str(unique), GEARMAN_MAX_UNIQUE_SIZE);424 strncpy(task->unique, gearman_c_str(unique), GEARMAN_MAX_UNIQUE_SIZE);
426 task->impl()->unique[task->impl()->unique_length]= 0;425 task->unique[task->unique_length]= 0;
427 }426 }
428 else427 else
429 {428 {
430 if (client->impl()->options.generate_unique or is_background(command))429 if (client->impl()->options.generate_unique or is_background(command))
431 {430 {
432 safe_uuid_generate(task->impl()->unique, task->impl()->unique_length);431 safe_uuid_generate(task->unique, task->unique_length);
433 }432 }
434 else433 else
435 {434 {
436 task->impl()->unique_length= 0;435 task->unique_length= 0;
437 task->impl()->unique[0]= 0;436 task->unique[0]= 0;
438 }437 }
439 }438 }
440439
441 args[1]= task->impl()->unique;440 args[1]= task->unique;
442 args_size[1]= task->impl()->unique_length +1; // +1 is for the needed null441 args_size[1]= task->unique_length +1; // +1 is for the needed null
443442
444 assert_msg(command == GEARMAN_COMMAND_SUBMIT_REDUCE_JOB or command == GEARMAN_COMMAND_SUBMIT_REDUCE_JOB_BACKGROUND,443 assert_msg(command == GEARMAN_COMMAND_SUBMIT_REDUCE_JOB or command == GEARMAN_COMMAND_SUBMIT_REDUCE_JOB_BACKGROUND,
445 "Command was not appropriate for request");444 "Command was not appropriate for request");
@@ -474,14 +473,14 @@
474 args_size[4]= gearman_size(workload);473 args_size[4]= gearman_size(workload);
475474
476 gearman_return_t rc;475 gearman_return_t rc;
477 if (gearman_success(rc= gearman_packet_create_args(client->impl()->universal, task->impl()->send,476 if (gearman_success(rc= gearman_packet_create_args(client->impl()->universal, task->send,
478 GEARMAN_MAGIC_REQUEST, command,477 GEARMAN_MAGIC_REQUEST, command,
479 args, args_size,478 args, args_size,
480 5)))479 5)))
481 {480 {
482 client->impl()->new_tasks++;481 client->impl()->new_tasks++;
483 client->impl()->running_tasks++;482 client->impl()->running_tasks++;
484 task->impl()->options.send_in_use= true;483 task->options.send_in_use= true;
485 }484 }
486 else485 else
487 {486 {
@@ -489,7 +488,7 @@
489 gearman_task_free(task);488 gearman_task_free(task);
490 task= NULL;489 task= NULL;
491 }490 }
492 task->impl()->type= GEARMAN_TASK_KIND_EXECUTE;491 task->type= GEARMAN_TASK_KIND_EXECUTE;
493492
494 return task;493 return task->shell();
495}494}
496495
=== modified file 'libgearman/client.cc'
--- libgearman/client.cc 2013-07-06 08:02:19 +0000
+++ libgearman/client.cc 2013-07-06 19:18:28 +0000
@@ -89,7 +89,7 @@
89/**89/**
90 * Real do function.90 * Real do function.
91 */91 */
92static void *_client_do(gearman_client_st *client, gearman_command_t command,92static void *_client_do(gearman_client_st *client_shell, gearman_command_t command,
93 const char *function_name,93 const char *function_name,
94 const char *unique,94 const char *unique,
95 const void *workload_str, size_t workload_size,95 const void *workload_str, size_t workload_size,
@@ -101,13 +101,15 @@
101 ret_ptr= &unused;101 ret_ptr= &unused;
102 }102 }
103103
104 if (client == NULL or client->impl() == NULL)104 if (client_shell == NULL or client_shell->impl() == NULL)
105 {105 {
106 *ret_ptr= GEARMAN_INVALID_ARGUMENT;106 *ret_ptr= GEARMAN_INVALID_ARGUMENT;
107 return NULL;107 return NULL;
108 }108 }
109109
110 client->impl()->universal.reset_error();110 Client* client= client_shell->impl();
111
112 client->universal.reset_error();
111113
112 size_t unused_size;114 size_t unused_size;
113 if (result_size == NULL)115 if (result_size == NULL)
@@ -122,17 +124,17 @@
122124
123 gearman_task_st do_task;125 gearman_task_st do_task;
124 {126 {
125 client->impl()->universal.options.no_new_data= true;127 client->universal.options.no_new_data= true;
126 gearman_task_st *do_task_ptr= add_task(*client, &do_task, NULL, command,128 gearman_task_st *do_task_ptr= add_task(*(client->shell()), &do_task, NULL, command,
127 function,129 function,
128 local_unique,130 local_unique,
129 workload,131 workload,
130 time_t(0),132 time_t(0),
131 gearman_actions_do_default());133 gearman_actions_do_default());
132 client->impl()->universal.options.no_new_data= false;134 client->universal.options.no_new_data= false;
133 if (do_task_ptr == NULL)135 if (do_task_ptr == NULL)
134 {136 {
135 *ret_ptr= client->impl()->universal.error_code();137 *ret_ptr= client->universal.error_code();
136 return NULL;138 return NULL;
137 }139 }
138 assert_msg(do_task.impl(), "Bad return by add_task()");140 assert_msg(do_task.impl(), "Bad return by add_task()");
@@ -141,10 +143,10 @@
141143
142 do_task.impl()->type= GEARMAN_TASK_KIND_DO;144 do_task.impl()->type= GEARMAN_TASK_KIND_DO;
143145
144 gearman_return_t ret= gearman_client_run_block_tasks(client, &do_task);146 gearman_return_t ret= gearman_client_run_block_tasks(client->shell(), &do_task);
145147
146 // gearman_client_run_tasks failed148 // gearman_client_run_tasks failed
147 assert(client->impl()->task_list); // Programmer error, we should always have the task that we used for do149 assert(client->task_list); // Programmer error, we should always have the task that we used for do
148150
149 char *returnable= NULL;151 char *returnable= NULL;
150 if (gearman_failed(ret))152 if (gearman_failed(ret))
@@ -153,7 +155,7 @@
153 { }155 { }
154 else156 else
155 {157 {
156 gearman_error(client->impl()->universal, ret, "occured during gearman_client_run_tasks()");158 gearman_error(client->universal, ret, "occured during gearman_client_run_tasks()");
157 }159 }
158160
159 *ret_ptr= ret;161 *ret_ptr= ret;
@@ -164,13 +166,13 @@
164 *ret_ptr= do_task.impl()->result_rc;166 *ret_ptr= do_task.impl()->result_rc;
165 if (gearman_task_result(&do_task))167 if (gearman_task_result(&do_task))
166 {168 {
167 if (gearman_has_allocator(client->impl()->universal))169 if (gearman_has_allocator(client->universal))
168 {170 {
169 gearman_string_t result= gearman_result_string(do_task.impl()->result());171 gearman_string_t result= gearman_result_string(do_task.impl()->result());
170 returnable= static_cast<char *>(gearman_malloc(client->impl()->universal, gearman_size(result) +1));172 returnable= static_cast<char *>(gearman_malloc(client->universal, gearman_size(result) +1));
171 if (returnable == NULL)173 if (returnable == NULL)
172 {174 {
173 gearman_error(client->impl()->universal, GEARMAN_MEMORY_ALLOCATION_FAILURE, "custom workload_fn failed to allocate memory");175 gearman_error(client->universal, GEARMAN_MEMORY_ALLOCATION_FAILURE, "custom workload_fn failed to allocate memory");
174 *result_size= 0;176 *result_size= 0;
175 }177 }
176 else // NULL terminate178 else // NULL terminate
@@ -194,15 +196,15 @@
194 }196 }
195 else // gearman_client_run_tasks() was successful, but the task was not197 else // gearman_client_run_tasks() was successful, but the task was not
196 {198 {
197 gearman_error(client->impl()->universal, do_task.impl()->result_rc, "occured during gearman_client_run_tasks()");199 gearman_error(client->universal, do_task.impl()->result_rc, "occured during gearman_client_run_tasks()");
198200
199 *ret_ptr= do_task.impl()->result_rc;201 *ret_ptr= do_task.impl()->result_rc;
200 *result_size= 0;202 *result_size= 0;
201 }203 }
202204
203 gearman_task_free(&do_task);205 gearman_task_free(&do_task);
204 client->impl()->new_tasks= 0;206 client->new_tasks= 0;
205 client->impl()->running_tasks= 0;207 client->running_tasks= 0;
206208
207 return returnable;209 return returnable;
208}210}
@@ -210,31 +212,32 @@
210/*212/*
211 Real background do function.213 Real background do function.
212*/214*/
213static gearman_return_t _client_do_background(gearman_client_st *client,215static gearman_return_t _client_do_background(gearman_client_st* client_shell,
214 gearman_command_t command,216 gearman_command_t command,
215 gearman_string_t &function,217 gearman_string_t &function,
216 gearman_unique_t &unique,218 gearman_unique_t &unique,
217 gearman_string_t &workload,219 gearman_string_t &workload,
218 gearman_job_handle_t job_handle)220 gearman_job_handle_t job_handle)
219{221{
220 if (client == NULL)222 if (client_shell == NULL or client_shell->impl() == NULL)
221 {223 {
222 return GEARMAN_INVALID_ARGUMENT;224 return GEARMAN_INVALID_ARGUMENT;
223 }225 }
224226
225 client->impl()->universal.reset_error();227 Client* client= client_shell->impl();
228 client->universal.reset_error();
226229
227 if (gearman_size(function) == 0)230 if (gearman_size(function) == 0)
228 {231 {
229 return gearman_error(client->impl()->universal, GEARMAN_INVALID_ARGUMENT, "function argument was empty");232 return gearman_error(client->universal, GEARMAN_INVALID_ARGUMENT, "function argument was empty");
230 }233 }
231234
232 client->impl()->_do_handle[0]= 0; // Reset the job_handle we store in client235 client->_do_handle[0]= 0; // Reset the job_handle we store in client
233236
234 gearman_task_st do_task;237 gearman_task_st do_task;
235 {238 {
236 client->impl()->universal.options.no_new_data= true;239 client->universal.options.no_new_data= true;
237 gearman_task_st* do_task_ptr= add_task(*client, &do_task, 240 gearman_task_st* do_task_ptr= add_task(*client_shell, &do_task,
238 client, 241 client,
239 command,242 command,
240 function,243 function,
@@ -242,26 +245,26 @@
242 workload,245 workload,
243 time_t(0),246 time_t(0),
244 gearman_actions_do_default());247 gearman_actions_do_default());
245 client->impl()->universal.options.no_new_data= false;248 client->universal.options.no_new_data= false;
246249
247 if (do_task_ptr == NULL)250 if (do_task_ptr == NULL)
248 {251 {
249 return client->impl()->universal.error_code();252 return client->universal.error_code();
250 }253 }
251 assert(do_task_ptr);254 assert(do_task_ptr);
252 assert(&do_task == do_task_ptr);255 assert(&do_task == do_task_ptr);
253 }256 }
254 do_task.impl()->type= GEARMAN_TASK_KIND_DO;257 do_task.impl()->type= GEARMAN_TASK_KIND_DO;
255258
256 gearman_return_t ret= gearman_client_run_block_tasks(client, &do_task);259 gearman_return_t ret= gearman_client_run_block_tasks(client_shell, &do_task);
257260
258 if (job_handle)261 if (job_handle)
259 {262 {
260 strncpy(job_handle, do_task.impl()->job_handle, GEARMAN_JOB_HANDLE_SIZE);263 strncpy(job_handle, do_task.impl()->job_handle, GEARMAN_JOB_HANDLE_SIZE);
261 }264 }
262 strncpy(client->impl()->_do_handle, do_task.impl()->job_handle, GEARMAN_JOB_HANDLE_SIZE);265 strncpy(client->_do_handle, do_task.impl()->job_handle, GEARMAN_JOB_HANDLE_SIZE);
263 client->impl()->new_tasks= 0;266 client->new_tasks= 0;
264 client->impl()->running_tasks= 0;267 client->running_tasks= 0;
265 gearman_task_free(&do_task);268 gearman_task_free(&do_task);
266269
267 return ret;270 return ret;
@@ -393,31 +396,33 @@
393{396{
394 if (client_shell and client_shell->impl())397 if (client_shell and client_shell->impl())
395 {398 {
399 Client* client= client_shell->impl();
400
396 switch (option)401 switch (option)
397 {402 {
398 case GEARMAN_CLIENT_ALLOCATED:403 case GEARMAN_CLIENT_ALLOCATED:
399 return gearman_is_allocated(client_shell);404 return gearman_is_allocated(client_shell);
400405
401 case GEARMAN_CLIENT_NON_BLOCKING:406 case GEARMAN_CLIENT_NON_BLOCKING:
402 return client_shell->impl()->options.non_blocking;407 return client->options.non_blocking;
403408
404 case GEARMAN_CLIENT_UNBUFFERED_RESULT:409 case GEARMAN_CLIENT_UNBUFFERED_RESULT:
405 return client_shell->impl()->options.unbuffered_result;410 return client->options.unbuffered_result;
406411
407 case GEARMAN_CLIENT_NO_NEW:412 case GEARMAN_CLIENT_NO_NEW:
408 return client_shell->impl()->options.no_new;413 return client->options.no_new;
409414
410 case GEARMAN_CLIENT_FREE_TASKS:415 case GEARMAN_CLIENT_FREE_TASKS:
411 return client_shell->impl()->options.free_tasks;416 return client->options.free_tasks;
412417
413 case GEARMAN_CLIENT_GENERATE_UNIQUE:418 case GEARMAN_CLIENT_GENERATE_UNIQUE:
414 return client_shell->impl()->options.generate_unique;419 return client->options.generate_unique;
415420
416 case GEARMAN_CLIENT_EXCEPTION:421 case GEARMAN_CLIENT_EXCEPTION:
417 return client_shell->impl()->options.exceptions;422 return client->options.exceptions;
418423
419 case GEARMAN_CLIENT_SSL:424 case GEARMAN_CLIENT_SSL:
420 return client_shell->impl()->ssl();425 return client->ssl();
421426
422 default:427 default:
423 case GEARMAN_CLIENT_TASK_IN_USE:428 case GEARMAN_CLIENT_TASK_IN_USE:
@@ -463,35 +468,36 @@
463{468{
464 if (client_shell and client_shell->impl())469 if (client_shell and client_shell->impl())
465 {470 {
471 Client* client= client_shell->impl();
466 if (options & GEARMAN_CLIENT_NON_BLOCKING)472 if (options & GEARMAN_CLIENT_NON_BLOCKING)
467 {473 {
468 gearman_universal_add_options(client_shell->impl()->universal, GEARMAN_UNIVERSAL_NON_BLOCKING);474 gearman_universal_add_options(client->universal, GEARMAN_UNIVERSAL_NON_BLOCKING);
469 client_shell->impl()->options.non_blocking= true;475 client->options.non_blocking= true;
470 }476 }
471477
472 if (options & GEARMAN_CLIENT_UNBUFFERED_RESULT)478 if (options & GEARMAN_CLIENT_UNBUFFERED_RESULT)
473 {479 {
474 client_shell->impl()->options.unbuffered_result= true;480 client->options.unbuffered_result= true;
475 }481 }
476482
477 if (options & GEARMAN_CLIENT_FREE_TASKS)483 if (options & GEARMAN_CLIENT_FREE_TASKS)
478 {484 {
479 client_shell->impl()->options.free_tasks= true;485 client->options.free_tasks= true;
480 }486 }
481487
482 if (options & GEARMAN_CLIENT_GENERATE_UNIQUE)488 if (options & GEARMAN_CLIENT_GENERATE_UNIQUE)
483 {489 {
484 client_shell->impl()->options.generate_unique= true;490 client->options.generate_unique= true;
485 }491 }
486492
487 if (options & GEARMAN_CLIENT_EXCEPTION)493 if (options & GEARMAN_CLIENT_EXCEPTION)
488 {494 {
489 client_shell->impl()->options.exceptions= gearman_client_set_server_option(client_shell, gearman_literal_param("exceptions"));495 client->options.exceptions= gearman_client_set_server_option(client_shell, gearman_literal_param("exceptions"));
490 }496 }
491497
492 if (options & GEARMAN_CLIENT_SSL)498 if (options & GEARMAN_CLIENT_SSL)
493 {499 {
494 client_shell->impl()->ssl(true);500 client->ssl(true);
495 }501 }
496 }502 }
497}503}
@@ -501,25 +507,27 @@
501{507{
502 if (client_shell and client_shell->impl())508 if (client_shell and client_shell->impl())
503 {509 {
510 Client* client= client_shell->impl();
511
504 if (options & GEARMAN_CLIENT_NON_BLOCKING)512 if (options & GEARMAN_CLIENT_NON_BLOCKING)
505 {513 {
506 gearman_universal_remove_options(client_shell->impl()->universal, GEARMAN_UNIVERSAL_NON_BLOCKING);514 gearman_universal_remove_options(client->universal, GEARMAN_UNIVERSAL_NON_BLOCKING);
507 client_shell->impl()->options.non_blocking= false;515 client->options.non_blocking= false;
508 }516 }
509517
510 if (options & GEARMAN_CLIENT_UNBUFFERED_RESULT)518 if (options & GEARMAN_CLIENT_UNBUFFERED_RESULT)
511 {519 {
512 client_shell->impl()->options.unbuffered_result= false;520 client->options.unbuffered_result= false;
513 }521 }
514522
515 if (options & GEARMAN_CLIENT_FREE_TASKS)523 if (options & GEARMAN_CLIENT_FREE_TASKS)
516 {524 {
517 client_shell->impl()->options.free_tasks= false;525 client->options.free_tasks= false;
518 }526 }
519527
520 if (options & GEARMAN_CLIENT_GENERATE_UNIQUE)528 if (options & GEARMAN_CLIENT_GENERATE_UNIQUE)
521 {529 {
522 client_shell->impl()->options.generate_unique= false;530 client->options.generate_unique= false;
523 }531 }
524 }532 }
525}533}
@@ -988,13 +996,14 @@
988 return gearman_echo(client->impl()->universal, workload, workload_size);996 return gearman_echo(client->impl()->universal, workload, workload_size);
989}997}
990998
991void gearman_client_task_free_all(gearman_client_st *client)999void gearman_client_task_free_all(gearman_client_st *client_shell)
992{1000{
993 if (client and client->impl() and client->impl()->task_list)1001 if (client_shell and client_shell->impl() and client_shell->impl()->task_list)
994 {1002 {
995 while (client->impl()->task_list)1003 Client* client= client_shell->impl();
1004 while (client->task_list)
996 {1005 {
997 gearman_task_free(client->impl()->task_list);1006 gearman_task_free(client->task_list);
998 }1007 }
999 }1008 }
1000}1009}
@@ -1397,96 +1406,96 @@
1397 }1406 }
1398}1407}
13991408
1400static inline gearman_return_t _client_run_tasks(gearman_client_st *client, gearman_task_st* exit_task)1409static inline gearman_return_t _client_run_tasks(gearman_client_st *client_shell, gearman_task_st* exit_task)
1401{1410{
1402 gearman_return_t ret= GEARMAN_MAX_RETURN;1411 gearman_return_t ret= GEARMAN_MAX_RETURN;
14031412
1404 switch(client->impl()->state)1413 Client* client= client_shell->impl();
1414
1415 switch(client->state)
1405 {1416 {
1406 case GEARMAN_CLIENT_STATE_IDLE:1417 case GEARMAN_CLIENT_STATE_IDLE:
1407 while (1)1418 while (1)
1408 {1419 {
1409 /* Start any new tasks. */1420 /* Start any new tasks. */
1410 if (client->impl()->new_tasks > 0 && ! (client->impl()->options.no_new))1421 if (client->new_tasks > 0 && ! (client->options.no_new))
1411 {1422 {
1412 for (client->impl()->task= client->impl()->task_list; client->impl()->task;1423 for (client->task= client->task_list; client->task;
1413 client->impl()->task= client->impl()->task->impl()->next)1424 client->task= client->task->impl()->next)
1414 {1425 {
1415 if (client->impl()->task->impl()->state != GEARMAN_TASK_STATE_NEW)1426 if (client->task->impl()->state != GEARMAN_TASK_STATE_NEW)
1416 {1427 {
1417 continue;1428 continue;
1418 }1429 }
14191430
1420 case GEARMAN_CLIENT_STATE_NEW:1431 case GEARMAN_CLIENT_STATE_NEW:
1421 if (client->impl()->task == NULL)1432 if (client->task == NULL)
1422 {1433 {
1423 client->impl()->state= GEARMAN_CLIENT_STATE_IDLE;1434 client->state= GEARMAN_CLIENT_STATE_IDLE;
1424 break;1435 break;
1425 }1436 }
14261437
1427 assert_msg(client == client->impl()->task->impl()->client, "Programmer error, client and task member client are not the same");1438 gearman_return_t local_ret= _client_run_task(client->task->impl());
1428 gearman_return_t local_ret= _client_run_task(client->impl()->task->impl());
1429 if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)1439 if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
1430 {1440 {
1431 client->impl()->state= GEARMAN_CLIENT_STATE_NEW;1441 client->state= GEARMAN_CLIENT_STATE_NEW;
14321442
1433 return local_ret;1443 return local_ret;
1434 }1444 }
1435 }1445 }
14361446
1437 if (client->impl()->new_tasks == 0)1447 if (client->new_tasks == 0)
1438 {1448 {
1439 gearman_flush_all(client->impl()->universal);1449 gearman_flush_all(client->universal);
1440 }1450 }
1441 }1451 }
14421452
1443 /* See if there are any connections ready for I/O. */1453 /* See if there are any connections ready for I/O. */
1444 while ((client->impl()->con= gearman_ready(client->impl()->universal)))1454 while ((client->con= gearman_ready(client->universal)))
1445 {1455 {
1446 if (client->impl()->con->revents & (POLLOUT | POLLERR | POLLHUP | POLLNVAL))1456 if (client->con->revents & (POLLOUT | POLLERR | POLLHUP | POLLNVAL))
1447 {1457 {
1448 /* Socket is ready for writing, continue submitting jobs. */1458 /* Socket is ready for writing, continue submitting jobs. */
1449 for (client->impl()->task= client->impl()->task_list; client->impl()->task;1459 for (client->task= client->task_list; client->task;
1450 client->impl()->task= client->impl()->task->impl()->next)1460 client->task= client->task->impl()->next)
1451 {1461 {
1452 if (client->impl()->task->impl()->con != client->impl()->con or1462 if (client->task->impl()->con != client->con or
1453 (client->impl()->task->impl()->state != GEARMAN_TASK_STATE_SUBMIT and1463 (client->task->impl()->state != GEARMAN_TASK_STATE_SUBMIT and
1454 client->impl()->task->impl()->state != GEARMAN_TASK_STATE_WORKLOAD))1464 client->task->impl()->state != GEARMAN_TASK_STATE_WORKLOAD))
1455 {1465 {
1456 continue;1466 continue;
1457 }1467 }
14581468
1459 case GEARMAN_CLIENT_STATE_SUBMIT:1469 case GEARMAN_CLIENT_STATE_SUBMIT:
1460 if (client->impl()->task == NULL)1470 if (client->task == NULL)
1461 {1471 {
1462 client->impl()->state= GEARMAN_CLIENT_STATE_IDLE;1472 client->state= GEARMAN_CLIENT_STATE_IDLE;
1463 break;1473 break;
1464 }1474 }
1465 assert_msg(client == client->impl()->task->impl()->client, "Programmer error, client and task member client are not the same");1475 gearman_return_t local_ret= _client_run_task(client->task->impl());
1466 gearman_return_t local_ret= _client_run_task(client->impl()->task->impl());
1467 if (local_ret == GEARMAN_COULD_NOT_CONNECT)1476 if (local_ret == GEARMAN_COULD_NOT_CONNECT)
1468 {1477 {
1469 client->impl()->state= GEARMAN_CLIENT_STATE_IDLE;1478 client->state= GEARMAN_CLIENT_STATE_IDLE;
1470 return local_ret;1479 return local_ret;
1471 }1480 }
1472 else if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)1481 else if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
1473 {1482 {
1474 client->impl()->state= GEARMAN_CLIENT_STATE_SUBMIT;1483 client->state= GEARMAN_CLIENT_STATE_SUBMIT;
1475 return local_ret;1484 return local_ret;
1476 }1485 }
1477 }1486 }
14781487
1479 /* Connection errors are fatal. */1488 /* Connection errors are fatal. */
1480 if (client->impl()->con->revents & (POLLERR | POLLHUP | POLLNVAL))1489 if (client->con->revents & (POLLERR | POLLHUP | POLLNVAL))
1481 {1490 {
1482 gearman_error(client->impl()->universal, GEARMAN_LOST_CONNECTION, "detected lost connection in _client_run_tasks()");1491 gearman_error(client->universal, GEARMAN_LOST_CONNECTION, "detected lost connection in _client_run_tasks()");
1483 client->impl()->con->close_socket();1492 client->con->close_socket();
1484 client->impl()->state= GEARMAN_CLIENT_STATE_IDLE;1493 client->state= GEARMAN_CLIENT_STATE_IDLE;
1485 return GEARMAN_LOST_CONNECTION;1494 return GEARMAN_LOST_CONNECTION;
1486 }1495 }
1487 }1496 }
14881497
1489 if ((client->impl()->con->revents & POLLIN) == 0)1498 if ((client->con->revents & POLLIN) == 0)
1490 {1499 {
1491 continue;1500 continue;
1492 }1501 }
@@ -1495,17 +1504,17 @@
1495 while (1)1504 while (1)
1496 {1505 {
1497 /* Read packet on connection and find which task it belongs to. */1506 /* Read packet on connection and find which task it belongs to. */
1498 if (client->impl()->options.unbuffered_result)1507 if (client->options.unbuffered_result)
1499 {1508 {
1500 /* If client is handling the data read, make sure it's complete. */1509 /* If client is handling the data read, make sure it's complete. */
1501 if (client->impl()->con->recv_state == GEARMAN_CON_RECV_STATE_READ_DATA)1510 if (client->con->recv_state == GEARMAN_CON_RECV_STATE_READ_DATA)
1502 {1511 {
1503 for (client->impl()->task= client->impl()->task_list; client->impl()->task;1512 for (client->task= client->task_list; client->task;
1504 client->impl()->task= client->impl()->task->impl()->next)1513 client->task= client->task->impl()->next)
1505 {1514 {
1506 if (client->impl()->task->impl()->con == client->impl()->con &&1515 if (client->task->impl()->con == client->con &&
1507 (client->impl()->task->impl()->state == GEARMAN_TASK_STATE_DATA or1516 (client->task->impl()->state == GEARMAN_TASK_STATE_DATA or
1508 client->impl()->task->impl()->state == GEARMAN_TASK_STATE_COMPLETE))1517 client->task->impl()->state == GEARMAN_TASK_STATE_COMPLETE))
1509 {1518 {
1510 break;1519 break;
1511 }1520 }
@@ -1515,24 +1524,24 @@
1515 Someone has set GEARMAN_CLIENT_UNBUFFERED_RESULT but hasn't setup the client to fetch data correctly.1524 Someone has set GEARMAN_CLIENT_UNBUFFERED_RESULT but hasn't setup the client to fetch data correctly.
1516 Fatal error :(1525 Fatal error :(
1517 */1526 */
1518 return gearman_universal_set_error(client->impl()->universal, GEARMAN_INVALID_ARGUMENT, GEARMAN_AT,1527 return gearman_universal_set_error(client->universal, GEARMAN_INVALID_ARGUMENT, GEARMAN_AT,
1519 "client created with GEARMAN_CLIENT_UNBUFFERED_RESULT, but was not setup to use it. %s", __func__);1528 "client created with GEARMAN_CLIENT_UNBUFFERED_RESULT, but was not setup to use it. %s", __func__);
1520 }1529 }
1521 else1530 else
1522 {1531 {
1523 /* Read the next packet, without buffering the data part. */1532 /* Read the next packet, without buffering the data part. */
1524 client->impl()->task= NULL;1533 client->task= NULL;
1525 (void)client->impl()->con->receiving(client->impl()->con->_packet, ret, false);1534 (void)client->con->receiving(client->con->_packet, ret, false);
1526 }1535 }
1527 }1536 }
1528 else1537 else
1529 {1538 {
1530 /* Read the next packet, buffering the data part. */1539 /* Read the next packet, buffering the data part. */
1531 client->impl()->task= NULL;1540 client->task= NULL;
1532 (void)client->impl()->con->receiving(client->impl()->con->_packet, ret, true);1541 (void)client->con->receiving(client->con->_packet, ret, true);
1533 }1542 }
15341543
1535 if (client->impl()->task == NULL)1544 if (client->task == NULL)
1536 {1545 {
1537 assert(ret != GEARMAN_MAX_RETURN);1546 assert(ret != GEARMAN_MAX_RETURN);
15381547
@@ -1544,71 +1553,71 @@
1544 break;1553 break;
1545 }1554 }
15461555
1547 client->impl()->state= GEARMAN_CLIENT_STATE_IDLE;1556 client->state= GEARMAN_CLIENT_STATE_IDLE;
1548 return ret;1557 return ret;
1549 }1558 }
15501559
1551 client->impl()->con->options.packet_in_use= true;1560 client->con->options.packet_in_use= true;
15521561
1553 /* We have a packet, see which task it belongs to. */1562 /* We have a packet, see which task it belongs to. */
1554 for (client->impl()->task= client->impl()->task_list; client->impl()->task;1563 for (client->task= client->task_list; client->task;
1555 client->impl()->task= client->impl()->task->impl()->next)1564 client->task= client->task->impl()->next)
1556 {1565 {
1557 if (client->impl()->task->impl()->con != client->impl()->con)1566 if (client->task->impl()->con != client->con)
1558 {1567 {
1559 continue;1568 continue;
1560 }1569 }
15611570
1562 if (client->impl()->con->_packet.command == GEARMAN_COMMAND_JOB_CREATED)1571 if (client->con->_packet.command == GEARMAN_COMMAND_JOB_CREATED)
1563 {1572 {
1564 if (client->impl()->task->impl()->created_id != client->impl()->con->created_id)1573 if (client->task->impl()->created_id != client->con->created_id)
1565 {1574 {
1566 continue;1575 continue;
1567 }1576 }
15681577
1569 /* New job created, drop through below and notify task. */1578 /* New job created, drop through below and notify task. */
1570 client->impl()->con->created_id++;1579 client->con->created_id++;
1571 }1580 }
1572 else if (client->impl()->con->_packet.command == GEARMAN_COMMAND_ERROR)1581 else if (client->con->_packet.command == GEARMAN_COMMAND_ERROR)
1573 {1582 {
1574 gearman_return_t maybe_server_error= string2return_code(static_cast<char *>(client->impl()->con->_packet.arg[0]), int(client->impl()->con->_packet.arg_size[0]));1583 gearman_return_t maybe_server_error= string2return_code(static_cast<char *>(client->con->_packet.arg[0]), int(client->con->_packet.arg_size[0]));
15751584
1576 if (maybe_server_error == GEARMAN_MAX_RETURN)1585 if (maybe_server_error == GEARMAN_MAX_RETURN)
1577 {1586 {
1578 maybe_server_error= GEARMAN_SERVER_ERROR;1587 maybe_server_error= GEARMAN_SERVER_ERROR;
1579 }1588 }
15801589
1581 gearman_universal_set_error(client->impl()->universal, maybe_server_error, GEARMAN_AT,1590 gearman_universal_set_error(client->universal, maybe_server_error, GEARMAN_AT,
1582 "%s:%.*s",1591 "%s:%.*s",
1583 static_cast<char *>(client->impl()->con->_packet.arg[0]),1592 static_cast<char *>(client->con->_packet.arg[0]),
1584 int(client->impl()->con->_packet.arg_size[1]),1593 int(client->con->_packet.arg_size[1]),
1585 static_cast<char *>(client->impl()->con->_packet.arg[1]));1594 static_cast<char *>(client->con->_packet.arg[1]));
15861595
1587 /* 1596 /*
1588 Packet cleanup copied from "Clean up the packet" below, and must1597 Packet cleanup copied from "Clean up the packet" below, and must
1589 remain in sync with its reference.1598 remain in sync with its reference.
1590 */1599 */
1591 gearman_packet_free(&(client->impl()->con->_packet));1600 gearman_packet_free(&(client->con->_packet));
1592 client->impl()->con->options.packet_in_use= false;1601 client->con->options.packet_in_use= false;
15931602
1594 /* This step copied from _client_run_tasks() above: */1603 /* This step copied from _client_run_tasks() above: */
1595 /* Increment this value because new job created then failed. */1604 /* Increment this value because new job created then failed. */
1596 client->impl()->con->created_id++;1605 client->con->created_id++;
15971606
1598 return maybe_server_error;1607 return maybe_server_error;
1599 }1608 }
1600 else if (client->impl()->con->_packet.command == GEARMAN_COMMAND_STATUS_RES_UNIQUE and1609 else if (client->con->_packet.command == GEARMAN_COMMAND_STATUS_RES_UNIQUE and
1601 (strncmp(gearman_task_unique(client->impl()->task),1610 (strncmp(gearman_task_unique(client->task),
1602 static_cast<char *>(client->impl()->con->_packet.arg[0]),1611 static_cast<char *>(client->con->_packet.arg[0]),
1603 client->impl()->con->_packet.arg_size[0]) == 0))1612 client->con->_packet.arg_size[0]) == 0))
1604 { }1613 { }
1605 else if (strncmp(client->impl()->task->impl()->job_handle,1614 else if (strncmp(client->task->impl()->job_handle,
1606 static_cast<char *>(client->impl()->con->_packet.arg[0]),1615 static_cast<char *>(client->con->_packet.arg[0]),
1607 client->impl()->con->_packet.arg_size[0]) ||1616 client->con->_packet.arg_size[0]) ||
1608 (client->impl()->con->_packet.failed() == false &&1617 (client->con->_packet.failed() == false &&
1609 strlen(client->impl()->task->impl()->job_handle) != client->impl()->con->_packet.arg_size[0] - 1) ||1618 strlen(client->task->impl()->job_handle) != client->con->_packet.arg_size[0] - 1) ||
1610 (client->impl()->con->_packet.failed() &&1619 (client->con->_packet.failed() &&
1611 strlen(client->impl()->task->impl()->job_handle) != client->impl()->con->_packet.arg_size[0]))1620 strlen(client->task->impl()->job_handle) != client->con->_packet.arg_size[0]))
1612 {1621 {
1613 continue;1622 continue;
1614 }1623 }
@@ -1618,20 +1627,19 @@
1618 break;1627 break;
1619 }1628 }
16201629
1621 if (client->impl()->task == NULL)1630 if (client->task == NULL)
1622 {1631 {
1623 /* The client has stopped waiting for the response, ignore it. */1632 /* The client has stopped waiting for the response, ignore it. */
1624 client->impl()->con->free_private_packet();1633 client->con->free_private_packet();
1625 continue;1634 continue;
1626 }1635 }
16271636
1628 client->impl()->task->impl()->recv= &(client->impl()->con->_packet);1637 client->task->impl()->recv= &(client->con->_packet);
1629 }1638 }
16301639
1631 case GEARMAN_CLIENT_STATE_PACKET:1640 case GEARMAN_CLIENT_STATE_PACKET:
1632 /* Let task process job created or result packet. */1641 /* Let task process job created or result packet. */
1633 assert_msg(client == client->impl()->task->impl()->client, "Programmer error, client and task member client are not the same");1642 gearman_return_t local_ret= _client_run_task(client->task->impl());
1634 gearman_return_t local_ret= _client_run_task(client->impl()->task->impl());
1635 if (local_ret == GEARMAN_IO_WAIT)1643 if (local_ret == GEARMAN_IO_WAIT)
1636 {1644 {
1637 break;1645 break;
@@ -1639,56 +1647,56 @@
16391647
1640 if (gearman_failed(local_ret))1648 if (gearman_failed(local_ret))
1641 {1649 {
1642 client->impl()->state= GEARMAN_CLIENT_STATE_PACKET;1650 client->state= GEARMAN_CLIENT_STATE_PACKET;
1643 return local_ret;1651 return local_ret;
1644 }1652 }
16451653
1646 /* Clean up the packet. */1654 /* Clean up the packet. */
1647 client->impl()->con->free_private_packet();1655 client->con->free_private_packet();
16481656
1649 /* If exit task is set and matched, exit */1657 /* If exit task is set and matched, exit */
1650 if (exit_task)1658 if (exit_task)
1651 {1659 {
1652 if (exit_task->impl()->result_rc != GEARMAN_UNKNOWN_STATE)1660 if (exit_task->impl()->result_rc != GEARMAN_UNKNOWN_STATE)
1653 {1661 {
1654 client->impl()->state= GEARMAN_CLIENT_STATE_IDLE;1662 client->state= GEARMAN_CLIENT_STATE_IDLE;
1655 return GEARMAN_SUCCESS;1663 return GEARMAN_SUCCESS;
1656 }1664 }
1657 }1665 }
16581666
1659 /* If all tasks are done, return. */1667 /* If all tasks are done, return. */
1660 if (client->impl()->running_tasks == 0)1668 if (client->running_tasks == 0)
1661 {1669 {
1662 client->impl()->state= GEARMAN_CLIENT_STATE_IDLE;1670 client->state= GEARMAN_CLIENT_STATE_IDLE;
1663 return GEARMAN_SUCCESS;1671 return GEARMAN_SUCCESS;
1664 }1672 }
1665 }1673 }
1666 }1674 }
16671675
1668 /* If all tasks are done, return. */1676 /* If all tasks are done, return. */
1669 if (client->impl()->running_tasks == 0)1677 if (client->running_tasks == 0)
1670 {1678 {
1671 break;1679 break;
1672 }1680 }
16731681
1674 if (client->impl()->new_tasks > 0 and ! (client->impl()->options.no_new))1682 if (client->new_tasks > 0 and ! (client->options.no_new))
1675 {1683 {
1676 continue;1684 continue;
1677 }1685 }
16781686
1679 if (client->impl()->options.non_blocking)1687 if (client->options.non_blocking)
1680 {1688 {
1681 /* Let the caller wait for activity. */1689 /* Let the caller wait for activity. */
1682 client->impl()->state= GEARMAN_CLIENT_STATE_IDLE;1690 client->state= GEARMAN_CLIENT_STATE_IDLE;
16831691
1684 return gearman_gerror(client->impl()->universal, GEARMAN_IO_WAIT);1692 return gearman_gerror(client->universal, GEARMAN_IO_WAIT);
1685 }1693 }
16861694
1687 /* Wait for activity on one of the connections. */1695 /* Wait for activity on one of the connections. */
1688 gearman_return_t local_ret= gearman_wait(client->impl()->universal);1696 gearman_return_t local_ret= gearman_wait(client->universal);
1689 if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)1697 if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
1690 {1698 {
1691 client->impl()->state= GEARMAN_CLIENT_STATE_IDLE;1699 client->state= GEARMAN_CLIENT_STATE_IDLE;
16921700
1693 return local_ret;1701 return local_ret;
1694 }1702 }
@@ -1697,73 +1705,76 @@
1697 break;1705 break;
1698 }1706 }
16991707
1700 client->impl()->state= GEARMAN_CLIENT_STATE_IDLE;1708 client->state= GEARMAN_CLIENT_STATE_IDLE;
17011709
1702 return GEARMAN_SUCCESS;1710 return GEARMAN_SUCCESS;
1703}1711}
17041712
1705gearman_return_t gearman_client_run_tasks(gearman_client_st *client)1713gearman_return_t gearman_client_run_tasks(gearman_client_st *client_shell)
1706{1714{
1707 if (client == NULL or client->impl() == NULL)1715 if (client_shell and client_shell->impl())
1708 {1716 {
1709 return GEARMAN_INVALID_ARGUMENT;1717 Client* client= client_shell->impl();
1710 }1718
17111719 if (client->task_list == NULL) // We are immediatly successful if all tasks are completed
1712 if (client->impl()->task_list == NULL) // We are immediatly successful if all tasks are completed1720 {
1713 {1721 return GEARMAN_SUCCESS;
1714 return GEARMAN_SUCCESS;1722 }
1715 }1723
17161724 gearman_return_t rc;
1717 gearman_return_t rc;1725 {
1718 {1726 PUSH_NON_BLOCKING(client->universal);
1719 PUSH_NON_BLOCKING(client->impl()->universal);1727
17201728 rc= _client_run_tasks(client_shell, NULL);
1721 rc= _client_run_tasks(client, NULL);1729 }
1722 }1730
1723
1724 if (rc == GEARMAN_COULD_NOT_CONNECT)
1725 {
1726 gearman_reset(client->impl()->universal);
1727 }
1728
1729 return rc;
1730}
1731
1732gearman_return_t gearman_client_run_block_tasks(gearman_client_st *shell, gearman_task_st* exit_task)
1733{
1734 if (shell == NULL)
1735 {
1736 return GEARMAN_INVALID_ARGUMENT;
1737 }
1738 Client *client= shell->impl();
1739
1740 if (client->task_list == NULL) // We are immediatly successful if all tasks are completed
1741 {
1742 return GEARMAN_SUCCESS;
1743 }
1744
1745
1746 gearman_return_t rc;
1747 {
1748 PUSH_BLOCKING(client->universal);
1749
1750 rc= _client_run_tasks(shell, exit_task);
1751 }
1752
1753 if (gearman_failed(rc))
1754 {
1755 if (rc == GEARMAN_COULD_NOT_CONNECT)1731 if (rc == GEARMAN_COULD_NOT_CONNECT)
1756 {1732 {
1757 gearman_reset(client->universal);1733 gearman_reset(client->universal);
1758 }1734 }
17591735
1760 if (client->universal.error_code() != rc and rc != GEARMAN_COULD_NOT_CONNECT)1736 return rc;
1761 {1737 }
1762 assert(client->universal.error_code() == rc);1738
1763 }1739 return GEARMAN_INVALID_ARGUMENT;
1764 }1740}
17651741
1766 return rc;1742gearman_return_t gearman_client_run_block_tasks(gearman_client_st *client_shell, gearman_task_st* exit_task)
1743{
1744 if (client_shell and client_shell->impl())
1745 {
1746 Client *client= client_shell->impl();
1747
1748 if (client->task_list == NULL) // We are immediatly successful if all tasks are completed
1749 {
1750 return GEARMAN_SUCCESS;
1751 }
1752
1753
1754 gearman_return_t rc;
1755 {
1756 PUSH_BLOCKING(client->universal);
1757
1758 rc= _client_run_tasks(client_shell, exit_task);
1759 }
1760
1761 if (gearman_failed(rc))
1762 {
1763 if (rc == GEARMAN_COULD_NOT_CONNECT)
1764 {
1765 gearman_reset(client->universal);
1766 }
1767
1768 if (client->universal.error_code() != rc and rc != GEARMAN_COULD_NOT_CONNECT)
1769 {
1770 assert(client->universal.error_code() == rc);
1771 }
1772 }
1773
1774 return rc;
1775 }
1776
1777 return GEARMAN_INVALID_ARGUMENT;
1767}1778}
17681779
1769/*1780/*
17701781
=== modified file 'libgearman/interface/task.hpp'
--- libgearman/interface/task.hpp 2013-07-06 01:22:31 +0000
+++ libgearman/interface/task.hpp 2013-07-06 19:18:28 +0000
@@ -2,7 +2,7 @@
2 * 2 *
3 * Gearmand client and server library.3 * Gearmand client and server library.
4 *4 *
5 * Copyright (C) 2012 Data Differential, http://datadifferential.com/5 * Copyright (C) 2012-2013 Data Differential, http://datadifferential.com/
6 * All rights reserved.6 * All rights reserved.
7 *7 *
8 * Redistribution and use in source and binary forms, with or without8 * Redistribution and use in source and binary forms, with or without
@@ -69,7 +69,7 @@
69 uint32_t numerator;69 uint32_t numerator;
70 uint32_t denominator;70 uint32_t denominator;
71 uint32_t client_count;71 uint32_t client_count;
72 gearman_client_st *client;72 Client *client;
73 gearman_task_st *next;73 gearman_task_st *next;
74 gearman_task_st *prev;74 gearman_task_st *prev;
75 void *context;75 void *context;
@@ -97,7 +97,7 @@
97 numerator(0),97 numerator(0),
98 denominator(0),98 denominator(0),
99 client_count(0),99 client_count(0),
100 client(&client_),100 client(client_.impl()),
101 next(NULL),101 next(NULL),
102 prev(NULL),102 prev(NULL),
103 context(NULL),103 context(NULL),
104104
=== modified file 'libgearman/run.cc'
--- libgearman/run.cc 2013-07-06 17:00:51 +0000
+++ libgearman/run.cc 2013-07-06 19:18:28 +0000
@@ -51,7 +51,7 @@
51 assert_msg(task->client, "Programmer error, somehow an invalid task was specified");51 assert_msg(task->client, "Programmer error, somehow an invalid task was specified");
52 if (task->client == NULL)52 if (task->client == NULL)
53 {53 {
54 return gearman_universal_set_error(task->client->impl()->universal, GEARMAN_INVALID_ARGUMENT, GEARMAN_AT,54 return gearman_universal_set_error(task->client->universal, GEARMAN_INVALID_ARGUMENT, GEARMAN_AT,
55 "Programmer error, somehow an invalid task was specified");55 "Programmer error, somehow an invalid task was specified");
56 }56 }
5757
@@ -59,16 +59,16 @@
59 {59 {
60 case GEARMAN_TASK_STATE_NEW:60 case GEARMAN_TASK_STATE_NEW:
61 61
62 if (task->client->impl()->universal.has_connections() == false)62 if (task->client->universal.has_connections() == false)
63 {63 {
64 assert(task->client->impl()->universal.con_count == 0);64 assert(task->client->universal.con_count == 0);
65 assert(task->client->impl()->universal.con_list == NULL);65 assert(task->client->universal.con_list == NULL);
66 task->client->impl()->new_tasks--;66 task->client->new_tasks--;
67 task->client->impl()->running_tasks--;67 task->client->running_tasks--;
68 return gearman_universal_set_error(task->client->impl()->universal, GEARMAN_NO_SERVERS, GEARMAN_AT, "no servers provided");68 return gearman_universal_set_error(task->client->universal, GEARMAN_NO_SERVERS, GEARMAN_AT, "no servers provided");
69 }69 }
7070
71 for (task->con= task->client->impl()->universal.con_list; task->con;71 for (task->con= task->client->universal.con_list; task->con;
72 task->con= task->con->next_connection())72 task->con= task->con->next_connection())
73 {73 {
74 if (task->con->send_state == GEARMAN_CON_SEND_STATE_NONE)74 if (task->con->send_state == GEARMAN_CON_SEND_STATE_NONE)
@@ -79,11 +79,11 @@
7979
80 if (task->con == NULL)80 if (task->con == NULL)
81 {81 {
82 task->client->impl()->options.no_new= true;82 task->client->options.no_new= true;
83 return gearman_gerror(task->client->impl()->universal, GEARMAN_IO_WAIT);83 return gearman_gerror(task->client->universal, GEARMAN_IO_WAIT);
84 }84 }
8585
86 task->client->impl()->new_tasks--;86 task->client->new_tasks--;
8787
88 if (task->send.command != GEARMAN_COMMAND_GET_STATUS)88 if (task->send.command != GEARMAN_COMMAND_GET_STATUS)
89 {89 {
@@ -95,7 +95,7 @@
95 while (1)95 while (1)
96 {96 {
97 assert(task->con);97 assert(task->con);
98 gearman_return_t ret= task->con->send_packet(task->send, task->client->impl()->new_tasks == 0 ? true : false);98 gearman_return_t ret= task->con->send_packet(task->send, task->client->new_tasks == 0 ? true : false);
9999
100 if (gearman_success(ret))100 if (gearman_success(ret))
101 {101 {
@@ -134,13 +134,23 @@
134134
135 if (ret == GEARMAN_COULD_NOT_CONNECT) // If no connection is found, we will let the user try again135 if (ret == GEARMAN_COULD_NOT_CONNECT) // If no connection is found, we will let the user try again
136 {136 {
137<<<<<<< TREE
137 task->set_state(GEARMAN_TASK_STATE_NEW);138 task->set_state(GEARMAN_TASK_STATE_NEW);
138 task->client->impl()->new_tasks++;139 task->client->impl()->new_tasks++;
140=======
141 task->set_state(GEARMAN_TASK_STATE_NEW);
142 task->client->new_tasks++;
143>>>>>>> MERGE-SOURCE
139 }144 }
140 else145 else
141 {146 {
147<<<<<<< TREE
142 task->set_state(GEARMAN_TASK_STATE_FAIL);148 task->set_state(GEARMAN_TASK_STATE_FAIL);
143 task->client->impl()->running_tasks--;149 task->client->impl()->running_tasks--;
150=======
151 task->set_state(GEARMAN_TASK_STATE_FAIL);
152 task->client->running_tasks--;
153>>>>>>> MERGE-SOURCE
144 }154 }
145 return ret;155 return ret;
146 }156 }
@@ -157,7 +167,7 @@
157 {167 {
158 if (not task->func.workload_fn)168 if (not task->func.workload_fn)
159 {169 {
160 gearman_error(task->client->impl()->universal, GEARMAN_NEED_WORKLOAD_FN,170 gearman_error(task->client->universal, GEARMAN_NEED_WORKLOAD_FN,
161 "workload size > 0, but no data pointer or workload_fn was given");171 "workload size > 0, but no data pointer or workload_fn was given");
162 return GEARMAN_NEED_WORKLOAD_FN;172 return GEARMAN_NEED_WORKLOAD_FN;
163 }173 }
@@ -171,8 +181,13 @@
171 }181 }
172 }182 }
173183
184<<<<<<< TREE
174 task->client->impl()->options.no_new= false;185 task->client->impl()->options.no_new= false;
175 task->set_state(GEARMAN_TASK_STATE_WORK);186 task->set_state(GEARMAN_TASK_STATE_WORK);
187=======
188 task->client->options.no_new= false;
189 task->set_state(GEARMAN_TASK_STATE_WORK);
190>>>>>>> MERGE-SOURCE
176 task->con->set_events(POLLIN);191 task->con->set_events(POLLIN);
177 return GEARMAN_SUCCESS;192 return GEARMAN_SUCCESS;
178193
@@ -407,6 +422,7 @@
407 break;422 break;
408 }423 }
409424
425<<<<<<< TREE
410 task->client->impl()->running_tasks--;426 task->client->impl()->running_tasks--;
411 task->set_state(GEARMAN_TASK_STATE_FINISHED);427 task->set_state(GEARMAN_TASK_STATE_FINISHED);
412428
@@ -414,6 +430,14 @@
414 assert(task->result_rc != GEARMAN_UNKNOWN_STATE);430 assert(task->result_rc != GEARMAN_UNKNOWN_STATE);
415431
416 if (task->client->impl()->options.free_tasks and task->type == GEARMAN_TASK_KIND_ADD_TASK)432 if (task->client->impl()->options.free_tasks and task->type == GEARMAN_TASK_KIND_ADD_TASK)
433=======
434 task->client->running_tasks--;
435 task->set_state(GEARMAN_TASK_STATE_FINISHED);
436
437 assert(task->result_rc != GEARMAN_UNKNOWN_STATE);
438
439 if (task->client->options.free_tasks and task->type == GEARMAN_TASK_KIND_ADD_TASK)
440>>>>>>> MERGE-SOURCE
417 {441 {
418 gearman_task_free(task->shell());442 gearman_task_free(task->shell());
419 }443 }
420444
=== modified file 'libgearman/task.cc'
--- libgearman/task.cc 2013-05-11 11:15:03 +0000
+++ libgearman/task.cc 2013-07-06 19:18:28 +0000
@@ -59,7 +59,6 @@
59 Task* task= new (std::nothrow) Task(client, task_shell);59 Task* task= new (std::nothrow) Task(client, task_shell);
60 if (task)60 if (task)
61 {61 {
62 assert(task->client == &client);
63 return task->shell();62 return task->shell();
64 }63 }
6564
@@ -69,6 +68,10 @@
69 return NULL;68 return NULL;
70}69}
7170
71void gearman_task_free(Task* task)
72{
73 gearman_task_free(task->shell());
74}
7275
73void gearman_task_free(gearman_task_st *task_shell)76void gearman_task_free(gearman_task_st *task_shell)
74{77{
@@ -91,14 +94,14 @@
91 gearman_packet_free(&(task->send));94 gearman_packet_free(&(task->send));
92 }95 }
9396
94 if (task->type != GEARMAN_TASK_KIND_DO and task->context and task->client->impl()->task_context_free_fn)97 if (task->type != GEARMAN_TASK_KIND_DO and task->context and task->client->task_context_free_fn)
95 {98 {
96 task->client->impl()->task_context_free_fn(task_shell, static_cast<void *>(task->context));99 task->client->task_context_free_fn(task_shell, static_cast<void *>(task->context));
97 }100 }
98101
99 if (task->client->impl()->task_list == task_shell)102 if (task->client->task_list == task_shell)
100 {103 {
101 task->client->impl()->task_list= task->next;104 task->client->task_list= task->next;
102 }105 }
103106
104 if (task->prev)107 if (task->prev)
@@ -111,13 +114,13 @@
111 task->next->impl()->prev= task->prev;114 task->next->impl()->prev= task->prev;
112 }115 }
113116
114 task->client->impl()->task_count--;117 task->client->task_count--;
115118
116 // If the task we are removing is a current task, remove it from the client119 // If the task we are removing is a current task, remove it from the client
117 // structures.120 // structures.
118 if (task->client->impl()->task == task_shell)121 if (task->client->task == task_shell)
119 {122 {
120 task->client->impl()->task= NULL;123 task->client->task= NULL;
121 }124 }
122 task->client= NULL;125 task->client= NULL;
123 }126 }
124127
=== modified file 'libgearman/task.hpp'
--- libgearman/task.hpp 2013-01-31 05:27:46 +0000
+++ libgearman/task.hpp 2013-07-06 19:18:28 +0000
@@ -49,6 +49,8 @@
49gearman_task_st *gearman_task_internal_create(gearman_client_st& client,49gearman_task_st *gearman_task_internal_create(gearman_client_st& client,
50 gearman_task_st *task);50 gearman_task_st *task);
5151
52void gearman_task_free(Task* task);
53
52void gearman_task_clear_fn(gearman_task_st *task);54void gearman_task_clear_fn(gearman_task_st *task);
5355
54bool gearman_task_is_active(const gearman_task_st *self);56bool gearman_task_is_active(const gearman_task_st *self);

Subscribers

People subscribed via source and target branches

to all changes: