Merge lp:~clint-fewbar/gearmand/time-order into lp:gearmand/1.0

Proposed by Clint Byrum on 2010-01-29
Status: Work in progress
Proposed branch: lp:~clint-fewbar/gearmand/time-order
Merge into: lp:gearmand/1.0
Diff against target: 566 lines (+317/-35)
10 files modified
gearmand/gearmand.c (+5/-0)
libgearman-server/gearmand.c (+5/-0)
libgearman-server/gearmand.h (+9/-0)
libgearman-server/job.c (+52/-33)
libgearman-server/job.h (+1/-0)
libgearman-server/server.c (+7/-0)
libgearman-server/server.h (+12/-0)
tests/include.am (+9/-2)
tests/test_gearmand.c (+20/-0)
tests/time_order_test.c (+197/-0)
To merge this branch: bzr merge lp:~clint-fewbar/gearmand/time-order
Reviewer Review Type Date Requested Status
Clint Byrum (community) Needs Fixing on 2010-02-02
Brian Aker (community) 2010-01-29 Needs Fixing on 2010-01-31
Review via email: mp+18238@code.launchpad.net
To post a comment you must log in.
Clint Byrum (clint-fewbar) wrote :

This is the result of merging lp:~coryb/gearmand/time-order by hand, after I accidentally blocked his merge by merging it in, then reverting it rather than uncomitting it.

* Adds support for --time-order which will make the server send jobs to workers in the order they were received.

Brian Aker (brianaker) wrote :

From Fedora I am still getting this when I merge:

[brian@gaz gearmand]$ tests/time_order_test

time_order

        Testing add 0.001 [ ok ]
        Testing worker [ failed ]

All tests completed successfully

Total 2
        Failed 1
        Skipped 0
        Succeeded 1

review: Needs Fixing
Clint Byrum (clint-fewbar) wrote :

I have seen that test fail intermittently now too. I am afraid it is a race condition. Suggest that this branch not be merged pending further testing.

review: Needs Fixing

Unmerged revisions

321. By Clint Byrum on 2010-01-29

Files missed in hand-merging

320. By Clint Byrum on 2010-01-29

Merge time-order part 2

319. By Clint Byrum on 2010-01-29

Merging time-order branch part 1

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'gearmand/gearmand.c'
2--- gearmand/gearmand.c 2010-01-28 21:45:14 +0000
3+++ gearmand/gearmand.c 2010-01-29 00:26:11 +0000
4@@ -114,6 +114,7 @@
5 uint32_t threads= 0;
6 const char *user= NULL;
7 uint8_t verbose= 0;
8+ uint8_t time_order = 0;
9 gearman_return_t ret;
10 gearmand_log_info_st log_info;
11 bool close_stdio= false;
12@@ -159,6 +160,7 @@
13 MCO("protocol", 'r', "PROTOCOL", "Load protocol module.")
14 MCO("queue-type", 'q', "QUEUE", "Persistent queue type to use.")
15 MCO("threads", 't', "THREADS", "Number of I/O threads to use. Default=0.")
16+ MCO("time-order", 'T', NULL, "Dispatch jobs in order received")
17 MCO("user", 'u', "USER", "Switch to given user after startup.")
18 MCO("verbose", 'v', NULL, "Increase verbosity level by one.")
19 MCO("version", 'V', NULL, "Display the version of gearmand and exit.")
20@@ -290,6 +292,8 @@
21 queue_type= value;
22 else if (!strcmp(name, "threads"))
23 threads= (uint32_t)atoi(value);
24+ else if (!strcmp(name, "time-order"))
25+ time_order++;
26 else if (!strcmp(name, "user"))
27 user= value;
28 else if (!strcmp(name, "verbose"))
29@@ -351,6 +355,7 @@
30 gearmand_set_job_retries(_gearmand, job_retries);
31 gearmand_set_worker_wakeup(_gearmand, worker_wakeup);
32 gearmand_set_log_fn(_gearmand, _log, &log_info, verbose);
33+ gearmand_set_time_order(_gearmand, time_order);
34
35 if (queue_type != NULL)
36 {
37
38=== modified file 'libgearman-server/gearmand.c'
39--- libgearman-server/gearmand.c 2010-01-28 21:45:14 +0000
40+++ libgearman-server/gearmand.c 2010-01-29 00:26:11 +0000
41@@ -165,6 +165,11 @@
42 gearmand->verbose= verbose;
43 }
44
45+void gearmand_set_time_order(gearmand_st *gearmand, uint8_t time_order)
46+{
47+ gearman_server_set_time_order(&gearmand->server,time_order);
48+}
49+
50 gearman_return_t gearmand_port_add(gearmand_st *gearmand, in_port_t port,
51 gearman_connection_add_fn *function)
52 {
53
54=== modified file 'libgearman-server/gearmand.h'
55--- libgearman-server/gearmand.h 2010-01-28 21:45:14 +0000
56+++ libgearman-server/gearmand.h 2010-01-29 00:26:11 +0000
57@@ -131,6 +131,15 @@
58 void gearmand_set_threads(gearmand_st *gearmand, uint32_t threads);
59
60 /**
61+ * Set option to order jobs by time
62+ * @param gearmand Server instance structure previously initialized with
63+ * gearmand_create.
64+ * @param boolean value, 0 (default) will not order jobs by time
65+ */
66+GEARMAN_API
67+void gearmand_set_time_order(gearmand_st *gearmand, uint8_t time_order);
68+
69+/**
70 * Set logging callback for server instance.
71 * @param gearmand Server instance structure previously initialized with
72 * gearmand_create.
73
74=== modified file 'libgearman-server/job.c'
75--- libgearman-server/job.c 2010-01-28 21:45:14 +0000
76+++ libgearman-server/job.c 2010-01-29 00:26:11 +0000
77@@ -110,6 +110,9 @@
78 return NULL;
79 }
80
81+ if( server->time_order )
82+ gettimeofday(&server_job->insert_time, NULL);
83+
84 server_job->priority= priority;
85
86 server_job->function= server_function;
87@@ -219,6 +222,8 @@
88 server_job->options= 0;
89
90 server_job->retries= 0;
91+ server_job->insert_time.tv_sec= 0;
92+ server_job->insert_time.tv_usec= 0;
93 server_job->priority= 0;
94 server_job->job_handle_key= 0;
95 server_job->unique_key= 0;
96@@ -298,12 +303,12 @@
97 return NULL;
98 }
99
100-gearman_server_job_st *
101-gearman_server_job_peek(gearman_server_con_st *server_con)
102-{
103+static gearman_server_job_st *
104+gearman_server_next_job(gearman_server_con_st *server_con) {
105 gearman_server_worker_st *server_worker;
106+ gearman_server_job_st *server_job=NULL, *job_cursor = NULL;
107 gearman_job_priority_t priority;
108-
109+
110 for (server_worker= server_con->worker_list; server_worker != NULL;
111 server_worker= server_worker->con_next)
112 {
113@@ -314,50 +319,64 @@
114 {
115 if (server_worker->function->job_list[priority] != NULL)
116 {
117- if (server_worker->function->job_list[priority]->options &
118- GEARMAN_SERVER_JOB_IGNORE)
119- {
120- /* This is only happens when a client disconnects from a foreground
121- job. We do this because we don't want to run the job anymore. */
122- server_worker->function->job_list[priority]->options&=
123- (gearman_server_job_options_t)~GEARMAN_SERVER_JOB_IGNORE;
124- gearman_server_job_free(gearman_server_job_take(server_con));
125- return gearman_server_job_peek(server_con);
126- }
127- return server_worker->function->job_list[priority];
128+ job_cursor = server_worker->function->job_list[priority];
129+ if (!server_job)
130+ {
131+ server_job = job_cursor;
132+ server_job->worker = server_worker;
133+ if( !server_job->server->time_order ) {
134+ /* if not order by time, just return on the first
135+ found job */
136+ return server_job;
137+ }
138+ }
139+ else if (job_cursor->priority < server_job->priority ||
140+ (job_cursor->priority == server_job->priority &&
141+ job_cursor->insert_time.tv_sec < server_job->insert_time.tv_sec) ||
142+ (job_cursor->priority == server_job->priority &&
143+ job_cursor->insert_time.tv_sec == server_job->insert_time.tv_sec &&
144+ job_cursor->insert_time.tv_usec < server_job->insert_time.tv_usec))
145+ {
146+ /* job is either higher priority or has been sitting in the queue longer,
147+ so do it first */
148+ server_job = job_cursor;
149+ server_job->worker = server_worker;
150+ continue;
151+ }
152 }
153 }
154 }
155 }
156+ return server_job;
157+}
158
159- return NULL;
160+gearman_server_job_st *
161+gearman_server_job_peek(gearman_server_con_st *server_con)
162+{
163+ gearman_server_job_st * server_job = gearman_server_next_job(server_con);
164+ if(!server_job)
165+ return NULL;
166+ if (server_job->options & GEARMAN_SERVER_JOB_IGNORE)
167+ {
168+ server_job->options &= (gearman_server_job_options_t)~GEARMAN_SERVER_JOB_IGNORE;
169+ gearman_server_job_free(gearman_server_job_take(server_con));
170+ return gearman_server_job_peek(server_con);
171+ }
172+ return server_job;
173 }
174
175 gearman_server_job_st *
176 gearman_server_job_take(gearman_server_con_st *server_con)
177 {
178 gearman_server_worker_st *server_worker;
179- gearman_server_job_st *server_job;
180 gearman_job_priority_t priority;
181-
182- for (server_worker= server_con->worker_list; server_worker != NULL;
183- server_worker= server_worker->con_next)
184- {
185- if (server_worker->function->job_count != 0)
186- break;
187- }
188-
189- if (server_worker == NULL)
190+ gearman_server_job_st * server_job = gearman_server_next_job(server_con);
191+ if(!server_job)
192 return NULL;
193
194- for (priority= GEARMAN_JOB_PRIORITY_HIGH;
195- priority != GEARMAN_JOB_PRIORITY_MAX; priority++)
196- {
197- if (server_worker->function->job_list[priority] != NULL)
198- break;
199- }
200+ server_worker = server_job->worker;
201+ priority = server_job->priority;
202
203- server_job= server_worker->function->job_list[priority];
204 server_job->function->job_list[priority]= server_job->function_next;
205 if (server_job->function->job_end[priority] == server_job)
206 server_job->function->job_end[priority]= NULL;
207
208=== modified file 'libgearman-server/job.h'
209--- libgearman-server/job.h 2010-01-28 21:45:14 +0000
210+++ libgearman-server/job.h 2010-01-29 00:26:11 +0000
211@@ -53,6 +53,7 @@
212 gearman_server_worker_st *worker;
213 char job_handle[GEARMAN_JOB_HANDLE_SIZE];
214 char unique[GEARMAN_UNIQUE_SIZE];
215+ struct timeval insert_time;
216 };
217
218 /**
219
220=== modified file 'libgearman-server/server.c'
221--- libgearman-server/server.c 2010-01-28 21:45:14 +0000
222+++ libgearman-server/server.c 2010-01-29 00:26:11 +0000
223@@ -106,6 +106,7 @@
224 server->queue_flush_fn= NULL;
225 server->queue_done_fn= NULL;
226 server->queue_replay_fn= NULL;
227+ server->time_order= 0;
228 memset(server->job_hash, 0,
229 sizeof(gearman_server_job_st *) * GEARMAN_JOB_HASH_SIZE);
230 memset(server->unique_hash, 0,
231@@ -207,6 +208,12 @@
232 gearman_set_log_fn(server->gearman, _log, server, verbose);
233 }
234
235+void gearman_server_set_time_order(gearman_server_st *server,
236+ uint8_t time_order)
237+{
238+ server->time_order= time_order;
239+}
240+
241 gearman_return_t gearman_server_run_command(gearman_server_con_st *server_con,
242 gearman_packet_st *packet)
243 {
244
245=== modified file 'libgearman-server/server.h'
246--- libgearman-server/server.h 2010-01-28 21:45:14 +0000
247+++ libgearman-server/server.h 2010-01-29 00:26:11 +0000
248@@ -63,6 +63,7 @@
249 char job_handle_prefix[GEARMAN_JOB_HANDLE_SIZE];
250 gearman_server_job_st *job_hash[GEARMAN_JOB_HASH_SIZE];
251 gearman_server_job_st *unique_hash[GEARMAN_JOB_HASH_SIZE];
252+ uint8_t time_order;
253 };
254
255 /**
256@@ -117,6 +118,17 @@
257 void *context, gearman_verbose_t verbose);
258
259 /**
260+ * Set option to order jobs by time
261+ * @param server Server structure previously initialized with
262+ * gearman_server_create.
263+ * @param boolean value, 0 (default) will not order jobs by time
264+ */
265+GEARMAN_API
266+void gearman_server_set_time_order(gearman_server_st *server,
267+ uint8_t time_order);
268+
269+
270+/**
271 * Process commands for a connection.
272 * @param server_con Server connection that has a packet to process.
273 * @param packet The packet that needs processing.
274
275=== modified file 'tests/include.am'
276--- tests/include.am 2010-01-28 21:45:14 +0000
277+++ tests/include.am 2010-01-29 00:26:11 +0000
278@@ -59,7 +59,8 @@
279 tests/cpp_test \
280 tests/internals_test \
281 tests/regression_test \
282- tests/worker_test
283+ tests/worker_test \
284+ tests/time_order_test
285
286 noinst_HEADERS+= \
287 tests/test.h \
288@@ -78,6 +79,9 @@
289 tests_worker_test_SOURCES= tests/worker_test.c
290 tests_worker_test_LDADD= ${TEST_LDADD}
291
292+tests_time_order_test_SOURCES= tests/time_order_test.c
293+tests_time_order_test_LDADD= ${TEST_LDADD}
294+
295 # Test linking with C++ application
296 tests_cpp_test_SOURCES= tests/cpp_test.cc
297 tests_cpp_test_LDADD= ${TEST_LDADD}
298@@ -88,6 +92,9 @@
299 test-worker:
300 tests/worker_test $(ARG1) $(ARG2)
301
302+test-time-order:
303+ tests/time_order_test $(ARG1) $(ARG2)
304+
305 test-internals:
306 tests/internals_test $(ARG1) $(ARG2)
307
308@@ -104,7 +111,7 @@
309 $(TOKYOCABINET_TEST) $(ARG1) $(ARG2)
310 $(TOKYOCABINET_RM)
311
312-check-local: test-client test-worker test-internals test-libmemcached test-sqlite test-tokyocabinet
313+check-local: test-client test-worker test-time-order test-internals test-libmemcached test-sqlite test-tokyocabinet
314
315 gdb-client: ${noinst_PROGRAMS}
316 $(LIBTOOL) --mode=execute gdb tests/client_test
317
318=== modified file 'tests/test_gearmand.c'
319--- tests/test_gearmand.c 2010-01-28 21:45:14 +0000
320+++ tests/test_gearmand.c 2010-01-29 00:26:11 +0000
321@@ -46,6 +46,10 @@
322 pid_t gearmand_pid;
323 gearmand_st *gearmand;
324 gearman_conf_st conf;
325+ gearman_conf_module_st module;
326+ const char *name;
327+ const char *value;
328+ uint8_t time_order = 0;
329
330 gearmand_pid= fork();
331 assert(gearmand_pid != -1);
332@@ -57,6 +61,13 @@
333
334 conf_ptr= gearman_conf_create(&conf);
335 assert(conf_ptr);
336+
337+ assert( gearman_conf_module_create(&conf, &module, NULL) );
338+
339+#define MCO(__name, __short, __value, __help) \
340+ gearman_conf_module_add_option(&module, __name, __short, __value, __help);
341+ MCO("time-order", 'T', NULL, "Dispatch jobs in order received")
342+
343 #ifdef HAVE_LIBDRIZZLE
344 ret= gearman_server_queue_libdrizzle_conf(&conf);
345 assert(ret == GEARMAN_SUCCESS);
346@@ -77,9 +88,18 @@
347 ret= gearman_conf_parse_args(&conf, argc, argv);
348 assert(ret == GEARMAN_SUCCESS);
349
350+ /* Check for option values that were given. */
351+ while (gearman_conf_module_value(&module, &name, &value))
352+ {
353+ if (!strcmp(name, "time-order"))
354+ time_order++;
355+ }
356+
357 gearmand= gearmand_create(NULL, port);
358 assert(gearmand != NULL);
359
360+ gearmand_set_time_order(gearmand, time_order);
361+
362 if (queue_type != NULL)
363 {
364 assert(argc);
365
366=== added file 'tests/time_order_test.c'
367--- tests/time_order_test.c 1970-01-01 00:00:00 +0000
368+++ tests/time_order_test.c 2010-01-29 00:26:11 +0000
369@@ -0,0 +1,197 @@
370+/* Gearman server and library
371+ * Copyright (C) 2008 Brian Aker, Eric Day
372+ * All rights reserved.
373+ *
374+ * Use and distribution licensed under the BSD license. See
375+ * the COPYING file in the parent directory for full text.
376+ */
377+
378+#include "config.h"
379+
380+#if defined(NDEBUG)
381+# undef NDEBUG
382+#endif
383+
384+#include <assert.h>
385+#include <stdio.h>
386+#include <stdlib.h>
387+#include <string.h>
388+#include <unistd.h>
389+
390+#include <libgearman/gearman.h>
391+#include "test.h"
392+#include "test_gearmand.h"
393+
394+#define WORKER_TEST_PORT 32123
395+
396+typedef struct
397+{
398+ pid_t gearmand_pid;
399+ gearman_worker_st worker;
400+ bool run_worker;
401+} worker_test_st;
402+
403+/* Prototypes */
404+test_return_t queue_add(void *object);
405+test_return_t queue_worker(void *object);
406+
407+test_return_t pre(void *object);
408+test_return_t post(void *object);
409+
410+void *world_create(test_return_t *error);
411+test_return_t world_destroy(void *object);
412+
413+/* append test for worker */
414+static void *append_function(gearman_job_st *job __attribute__((unused)),
415+ void *context, size_t *result_size,
416+ gearman_return_t *ret_ptr __attribute__((unused)))
417+{
418+ /* this will will set the last char in the context (buffer) to the */
419+ /* first char of the work */
420+ char * buf = (char *)context;
421+ char * work = (char *)gearman_job_workload(job);
422+ buf += strlen(buf);
423+ *buf = *work;
424+ *result_size= 0;
425+ return NULL;
426+}
427+
428+test_return_t queue_add(void *object)
429+{
430+ worker_test_st *test= (worker_test_st *)object;
431+ gearman_client_st client;
432+ char job_handle[GEARMAN_JOB_HANDLE_SIZE];
433+
434+ uint8_t * value= (uint8_t *)strdup("0");
435+ size_t value_length= 1;
436+ uint8_t i;
437+
438+ test->run_worker= false;
439+
440+ if (gearman_client_create(&client) == NULL)
441+ return TEST_FAILURE;
442+
443+ if (gearman_client_add_server(&client, NULL,
444+ WORKER_TEST_PORT) != GEARMAN_SUCCESS)
445+ {
446+ return TEST_FAILURE;
447+ }
448+
449+ /* send strings "0", "1" ... "9" to alternating between 2 queues */
450+ /* queue1 = 1,3,5,7,9 */
451+ /* queue2 = 0,2,4,6,8 */
452+ for (i=0; i<10; i++) {
453+ if (gearman_client_do_background(&client, i % 2 ? "queue1" : "queue2", NULL, value,
454+ value_length, job_handle) != GEARMAN_SUCCESS)
455+ {
456+ return TEST_FAILURE;
457+ }
458+ *value = (uint8_t)(*value + 1);
459+ }
460+
461+ gearman_client_free(&client);
462+ free(value);
463+
464+ test->run_worker= true;
465+ return TEST_SUCCESS;
466+}
467+
468+test_return_t queue_worker(void *object)
469+{
470+ worker_test_st *test= (worker_test_st *)object;
471+ gearman_worker_st *worker= &(test->worker);
472+ char buffer[11];
473+ int i;
474+ memset(buffer, 0, 11);
475+
476+ if (!test->run_worker)
477+ return TEST_FAILURE;
478+
479+ if (gearman_worker_add_function(worker, "queue1", 5, append_function,
480+ buffer) != GEARMAN_SUCCESS)
481+ {
482+ return TEST_FAILURE;
483+ }
484+
485+ if (gearman_worker_add_function(worker, "queue2", 5, append_function,
486+ buffer) != GEARMAN_SUCCESS)
487+ {
488+ return TEST_FAILURE;
489+ }
490+
491+ for (i=0; i<10; i++) {
492+ if (gearman_worker_work(worker) != GEARMAN_SUCCESS)
493+ return TEST_FAILURE;
494+ }
495+
496+ // expect buffer to be reassembled in order
497+ // of insert, not the default "sticky queue" order
498+ if( strcmp(buffer, "0123456789") )
499+ return TEST_FAILURE;
500+
501+ return TEST_SUCCESS;
502+}
503+
504+
505+void *world_create(test_return_t *error)
506+{
507+ worker_test_st *test;
508+ const char *argv[2]= { "test_gearmand", "--time-order"};
509+ pid_t gearmand_pid;
510+
511+ gearmand_pid= test_gearmand_start(WORKER_TEST_PORT, NULL, (char **)argv, 2);
512+
513+ test= malloc(sizeof(worker_test_st));
514+ if (! test)
515+ {
516+ *error= TEST_MEMORY_ALLOCATION_FAILURE;
517+ return NULL;
518+ }
519+
520+ memset(test, 0, sizeof(worker_test_st));
521+ if (gearman_worker_create(&(test->worker)) == NULL)
522+ {
523+ *error= TEST_FAILURE;
524+ return NULL;
525+ }
526+
527+ if (gearman_worker_add_server(&(test->worker), NULL, WORKER_TEST_PORT) != GEARMAN_SUCCESS)
528+ {
529+ *error= TEST_FAILURE;
530+ return NULL;
531+ }
532+
533+ test->gearmand_pid= gearmand_pid;
534+
535+ *error= TEST_SUCCESS;
536+
537+ return (void *)test;
538+}
539+
540+test_return_t world_destroy(void *object)
541+{
542+ worker_test_st *test= (worker_test_st *)object;
543+ gearman_worker_free(&(test->worker));
544+ test_gearmand_stop(test->gearmand_pid);
545+ free(test);
546+
547+ return TEST_SUCCESS;
548+}
549+
550+test_st tests[] ={
551+ {"add", 0, queue_add },
552+ {"worker", 0, queue_worker },
553+ {0, 0, 0}
554+};
555+
556+collection_st collection[] ={
557+ {"time_order", 0, 0, tests},
558+ {0, 0, 0, 0}
559+};
560+
561+void get_world(world_st *world)
562+{
563+ world->collections= collection;
564+ world->create= world_create;
565+ world->destroy= world_destroy;
566+}

Subscribers

People subscribed via source and target branches