Merge lp:~coryb/gearmand/time-order into lp:gearmand/1.0

Proposed by coryb
Status: Merged
Merged at revision: not available
Proposed branch: lp:~coryb/gearmand/time-order
Merge into: lp:gearmand/1.0
Diff against target: 566 lines (+317/-35)
10 files modified
gearmand/gearmand.c (+5/-0)
libgearman-server/gearmand.c (+5/-0)
libgearman-server/gearmand.h (+9/-0)
libgearman-server/job.c (+52/-33)
libgearman-server/job.h (+1/-0)
libgearman-server/server.c (+7/-0)
libgearman-server/server.h (+12/-0)
tests/include.am (+9/-2)
tests/test_gearmand.c (+20/-0)
tests/time_order_test.c (+197/-0)
To merge this branch: bzr merge lp:~coryb/gearmand/time-order
Reviewer Review Type Date Requested Status
Gearman-developers Pending
Review via email: mp+17782@code.launchpad.net
To post a comment you must log in.
Revision history for this message
coryb (coryb) wrote :

This branch adds the --time-order option to gearmand to dispatch jobs in FIFO order irregardless of queue the job is in.

I have merged in all the changes made to trunk so now this branch should merge cleanly to lp:gearmand.

I have also added a test case: tests/time_order_test

The context for this patch was discussed on the gearman google group:
http://groups.google.com/group/gearman/browse_thread/thread/f945197af727262d/f518dfe496724476

Revision history for this message
Brian Aker (brianaker) wrote :

If you take a look at http://hudson.drizzle.org/ at the Gearman queue you can see that your test is sometimes failing. Any ideas?

It is currently in the lp:gearmand/build tree

Thanks!

Revision history for this message
coryb (coryb) wrote :

Thanks Brian!

> If you take a look at http://hudson.drizzle.org/ at the Gearman queue you can
> see that your test is sometimes failing. Any ideas?

Could you point me to one of the builds that failed due to the time_order_test failing?
I spent some time looking at the builds for Jan-21 in the Gearman-build group, and I didn't see any of them where time_order_test failed. There were several that failed on the sqlite_test, but I didn't change that code, so I am not sure how it could be related exactly.

At one point when I was testing my code, the tests started failing due to a rogue test running in the background due do the fork in test_gearmand_start not getting cleaned up. I suspect some test called test_gearmand_start and then failed to call test_gearmand_stop. I also noticed that in world_create in both sqlite_test.c and my time_order_test.c (which was mostly copied from sqlite_test.c) the world_create calls test_gearmand_start but then there several error cases where we return but fail to call
test_gearmand_stop before returning. It is probably an edge-case but I think that could be triggering the sqlite_test failures. Since I cant log into the build machine I don't think I can debug this test case, but maybe you can log in and check to see if anything is bound to port 32123?

> It is currently in the lp:gearmand/build tree

Awesome, thanks!
-Cory

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'gearmand/gearmand.c'
2--- gearmand/gearmand.c 2009-12-02 05:22:18 +0000
3+++ gearmand/gearmand.c 2010-01-20 21:27:13 +0000
4@@ -110,6 +110,7 @@
5 uint32_t threads= 0;
6 const char *user= NULL;
7 uint8_t verbose= 0;
8+ uint8_t time_order = 0;
9 gearman_return_t ret;
10 gearmand_log_info_st log_info;
11 bool close_stdio= false;
12@@ -155,6 +156,7 @@
13 MCO("protocol", 'r', "PROTOCOL", "Load protocol module.")
14 MCO("queue-type", 'q', "QUEUE", "Persistent queue type to use.")
15 MCO("threads", 't', "THREADS", "Number of I/O threads to use. Default=0.")
16+ MCO("time-order", 'T', NULL, "Dispatch jobs in order received")
17 MCO("user", 'u', "USER", "Switch to given user after startup.")
18 MCO("verbose", 'v', NULL, "Increase verbosity level by one.")
19 MCO("version", 'V', NULL, "Display the version of gearmand and exit.")
20@@ -278,6 +280,8 @@
21 queue_type= value;
22 else if (!strcmp(name, "threads"))
23 threads= (uint32_t)atoi(value);
24+ else if (!strcmp(name, "time-order"))
25+ time_order++;
26 else if (!strcmp(name, "user"))
27 user= value;
28 else if (!strcmp(name, "verbose"))
29@@ -336,6 +340,7 @@
30
31 gearmand_set_backlog(_gearmand, backlog);
32 gearmand_set_threads(_gearmand, threads);
33+ gearmand_set_time_order(_gearmand, time_order);
34 gearmand_set_job_retries(_gearmand, job_retries);
35 gearmand_set_worker_wakeup(_gearmand, worker_wakeup);
36 gearmand_set_log_fn(_gearmand, _log, &log_info, verbose);
37
38=== modified file 'libgearman-server/gearmand.c'
39--- libgearman-server/gearmand.c 2010-01-02 19:57:26 +0000
40+++ libgearman-server/gearmand.c 2010-01-20 21:27:13 +0000
41@@ -165,6 +165,11 @@
42 gearmand->verbose= verbose;
43 }
44
45+void gearmand_set_time_order(gearmand_st *gearmand, uint8_t time_order)
46+{
47+ gearman_server_set_time_order(&gearmand->server,time_order);
48+}
49+
50 gearman_return_t gearmand_port_add(gearmand_st *gearmand, in_port_t port,
51 gearman_connection_add_fn *function)
52 {
53
54=== modified file 'libgearman-server/gearmand.h'
55--- libgearman-server/gearmand.h 2010-01-02 19:57:26 +0000
56+++ libgearman-server/gearmand.h 2010-01-20 21:27:13 +0000
57@@ -131,6 +131,15 @@
58 void gearmand_set_threads(gearmand_st *gearmand, uint32_t threads);
59
60 /**
61+ * Set option to order jobs by time
62+ * @param gearmand Server instance structure previously initialized with
63+ * gearmand_create.
64+ * @param boolean value, 0 (default) will not order jobs by time
65+ */
66+GEARMAN_API
67+void gearmand_set_time_order(gearmand_st *gearmand, uint8_t time_order);
68+
69+/**
70 * Set logging callback for server instance.
71 * @param gearmand Server instance structure previously initialized with
72 * gearmand_create.
73
74=== modified file 'libgearman-server/job.c'
75--- libgearman-server/job.c 2009-10-29 22:01:39 +0000
76+++ libgearman-server/job.c 2010-01-20 21:27:13 +0000
77@@ -110,6 +110,9 @@
78 return NULL;
79 }
80
81+ if( server->time_order )
82+ gettimeofday(&server_job->insert_time, NULL);
83+
84 server_job->priority= priority;
85
86 server_job->function= server_function;
87@@ -218,6 +221,8 @@
88 else
89 server_job->options= 0;
90
91+ server_job->insert_time.tv_sec= 0;
92+ server_job->insert_time.tv_usec= 0;
93 server_job->retries= 0;
94 server_job->priority= 0;
95 server_job->job_handle_key= 0;
96@@ -298,12 +303,12 @@
97 return NULL;
98 }
99
100-gearman_server_job_st *
101-gearman_server_job_peek(gearman_server_con_st *server_con)
102-{
103+static gearman_server_job_st *
104+gearman_server_next_job(gearman_server_con_st *server_con) {
105 gearman_server_worker_st *server_worker;
106+ gearman_server_job_st *server_job=NULL, *job_cursor = NULL;
107 gearman_job_priority_t priority;
108-
109+
110 for (server_worker= server_con->worker_list; server_worker != NULL;
111 server_worker= server_worker->con_next)
112 {
113@@ -314,50 +319,64 @@
114 {
115 if (server_worker->function->job_list[priority] != NULL)
116 {
117- if (server_worker->function->job_list[priority]->options &
118- GEARMAN_SERVER_JOB_IGNORE)
119- {
120- /* This is only happens when a client disconnects from a foreground
121- job. We do this because we don't want to run the job anymore. */
122- server_worker->function->job_list[priority]->options&=
123- (gearman_server_job_options_t)~GEARMAN_SERVER_JOB_IGNORE;
124- gearman_server_job_free(gearman_server_job_take(server_con));
125- return gearman_server_job_peek(server_con);
126- }
127- return server_worker->function->job_list[priority];
128+ job_cursor = server_worker->function->job_list[priority];
129+ if (!server_job)
130+ {
131+ server_job = job_cursor;
132+ server_job->worker = server_worker;
133+ if( !server_job->server->time_order ) {
134+ /* if not order by time, just return on the first
135+ found job */
136+ return server_job;
137+ }
138+ }
139+ else if (job_cursor->priority < server_job->priority ||
140+ (job_cursor->priority == server_job->priority &&
141+ job_cursor->insert_time.tv_sec < server_job->insert_time.tv_sec) ||
142+ (job_cursor->priority == server_job->priority &&
143+ job_cursor->insert_time.tv_sec == server_job->insert_time.tv_sec &&
144+ job_cursor->insert_time.tv_usec < server_job->insert_time.tv_usec))
145+ {
146+ /* job is either higher priority or has been sitting in the queue longer,
147+ so do it first */
148+ server_job = job_cursor;
149+ server_job->worker = server_worker;
150+ continue;
151+ }
152 }
153 }
154 }
155 }
156+ return server_job;
157+}
158
159- return NULL;
160+gearman_server_job_st *
161+gearman_server_job_peek(gearman_server_con_st *server_con)
162+{
163+ gearman_server_job_st * server_job = gearman_server_next_job(server_con);
164+ if(!server_job)
165+ return NULL;
166+ if (server_job->options & GEARMAN_SERVER_JOB_IGNORE)
167+ {
168+ server_job->options &= (gearman_server_job_options_t)~GEARMAN_SERVER_JOB_IGNORE;
169+ gearman_server_job_free(gearman_server_job_take(server_con));
170+ return gearman_server_job_peek(server_con);
171+ }
172+ return server_job;
173 }
174
175 gearman_server_job_st *
176 gearman_server_job_take(gearman_server_con_st *server_con)
177 {
178 gearman_server_worker_st *server_worker;
179- gearman_server_job_st *server_job;
180 gearman_job_priority_t priority;
181-
182- for (server_worker= server_con->worker_list; server_worker != NULL;
183- server_worker= server_worker->con_next)
184- {
185- if (server_worker->function->job_count != 0)
186- break;
187- }
188-
189- if (server_worker == NULL)
190+ gearman_server_job_st * server_job = gearman_server_next_job(server_con);
191+ if(!server_job)
192 return NULL;
193
194- for (priority= GEARMAN_JOB_PRIORITY_HIGH;
195- priority != GEARMAN_JOB_PRIORITY_MAX; priority++)
196- {
197- if (server_worker->function->job_list[priority] != NULL)
198- break;
199- }
200+ server_worker = server_job->worker;
201+ priority = server_job->priority;
202
203- server_job= server_worker->function->job_list[priority];
204 server_job->function->job_list[priority]= server_job->function_next;
205 if (server_job->function->job_end[priority] == server_job)
206 server_job->function->job_end[priority]= NULL;
207
208=== modified file 'libgearman-server/job.h'
209--- libgearman-server/job.h 2009-12-30 19:17:03 +0000
210+++ libgearman-server/job.h 2010-01-20 21:27:13 +0000
211@@ -53,6 +53,7 @@
212 gearman_server_worker_st *worker;
213 char job_handle[GEARMAN_JOB_HANDLE_SIZE];
214 char unique[GEARMAN_UNIQUE_SIZE];
215+ struct timeval insert_time;
216 };
217
218 /**
219
220=== modified file 'libgearman-server/server.c'
221--- libgearman-server/server.c 2010-01-05 18:45:42 +0000
222+++ libgearman-server/server.c 2010-01-20 21:27:13 +0000
223@@ -101,6 +101,7 @@
224 server->free_worker_list= NULL;
225 server->log_fn= NULL;
226 server->log_context= NULL;
227+ server->time_order= 0;
228 server->queue_context= NULL;
229 server->queue_add_fn= NULL;
230 server->queue_flush_fn= NULL;
231@@ -207,6 +208,12 @@
232 gearman_set_log_fn(server->gearman, _log, server, verbose);
233 }
234
235+void gearman_server_set_time_order(gearman_server_st *server,
236+ uint8_t time_order)
237+{
238+ server->time_order= time_order;
239+}
240+
241 gearman_return_t gearman_server_run_command(gearman_server_con_st *server_con,
242 gearman_packet_st *packet)
243 {
244
245=== modified file 'libgearman-server/server.h'
246--- libgearman-server/server.h 2010-01-02 19:57:26 +0000
247+++ libgearman-server/server.h 2010-01-20 21:27:13 +0000
248@@ -63,6 +63,7 @@
249 char job_handle_prefix[GEARMAN_JOB_HANDLE_SIZE];
250 gearman_server_job_st *job_hash[GEARMAN_JOB_HASH_SIZE];
251 gearman_server_job_st *unique_hash[GEARMAN_JOB_HASH_SIZE];
252+ uint8_t time_order;
253 };
254
255 /**
256@@ -117,6 +118,17 @@
257 void *context, gearman_verbose_t verbose);
258
259 /**
260+ * Set option to order jobs by time
261+ * @param server Server structure previously initialized with
262+ * gearman_server_create.
263+ * @param boolean value, 0 (default) will not order jobs by time
264+ */
265+GEARMAN_API
266+void gearman_server_set_time_order(gearman_server_st *server,
267+ uint8_t time_order);
268+
269+
270+/**
271 * Process commands for a connection.
272 * @param server_con Server connection that has a packet to process.
273 * @param packet The packet that needs processing.
274
275=== modified file 'tests/include.am'
276--- tests/include.am 2010-01-07 17:46:44 +0000
277+++ tests/include.am 2010-01-20 21:27:13 +0000
278@@ -50,7 +50,8 @@
279 tests/cpp_test \
280 tests/internals_test \
281 tests/regression_test \
282- tests/worker_test
283+ tests/worker_test \
284+ tests/time_order_test
285
286 noinst_HEADERS+= \
287 tests/test.h \
288@@ -69,6 +70,9 @@
289 tests_worker_test_SOURCES= tests/worker_test.c
290 tests_worker_test_LDADD= ${TEST_LDADD}
291
292+tests_time_order_test_SOURCES= tests/time_order_test.c
293+tests_time_order_test_LDADD= ${TEST_LDADD}
294+
295 # Test linking with C++ application
296 tests_cpp_test_SOURCES= tests/cpp_test.cc
297 tests_cpp_test_LDADD= ${TEST_LDADD}
298@@ -79,6 +83,9 @@
299 test-worker:
300 tests/worker_test $(ARG1) $(ARG2)
301
302+test-time-order:
303+ tests/time_order_test $(ARG1) $(ARG2)
304+
305 test-internals:
306 tests/internals_test $(ARG1) $(ARG2)
307
308@@ -91,7 +98,7 @@
309 $(SQLITE_TEST) $(ARG1) $(ARG2)
310 $(SQLITE_RM)
311
312-check-local: test-client test-worker test-internals test-libmemcached test-sqlite
313+check-local: test-client test-worker test-time-order test-internals test-libmemcached test-sqlite
314
315 gdb-client: ${noinst_PROGRAMS}
316 $(LIBTOOL) --mode=execute gdb tests/client_test
317
318=== modified file 'tests/test_gearmand.c'
319--- tests/test_gearmand.c 2010-01-05 18:11:04 +0000
320+++ tests/test_gearmand.c 2010-01-20 21:27:13 +0000
321@@ -42,6 +42,10 @@
322 pid_t gearmand_pid;
323 gearmand_st *gearmand;
324 gearman_conf_st conf;
325+ gearman_conf_module_st module;
326+ const char *name;
327+ const char *value;
328+ uint8_t time_order = 0;
329
330 gearmand_pid= fork();
331 assert(gearmand_pid != -1);
332@@ -53,6 +57,13 @@
333
334 conf_ptr= gearman_conf_create(&conf);
335 assert(conf_ptr);
336+
337+ assert( gearman_conf_module_create(&conf, &module, NULL) );
338+
339+#define MCO(__name, __short, __value, __help) \
340+ gearman_conf_module_add_option(&module, __name, __short, __value, __help);
341+ MCO("time-order", 'T', NULL, "Dispatch jobs in order received")
342+
343 #ifdef HAVE_LIBDRIZZLE
344 ret= gearman_server_queue_libdrizzle_conf(&conf);
345 assert(ret == GEARMAN_SUCCESS);
346@@ -69,9 +80,18 @@
347 ret= gearman_conf_parse_args(&conf, argc, argv);
348 assert(ret == GEARMAN_SUCCESS);
349
350+ /* Check for option values that were given. */
351+ while (gearman_conf_module_value(&module, &name, &value))
352+ {
353+ if (!strcmp(name, "time-order"))
354+ time_order++;
355+ }
356+
357 gearmand= gearmand_create(NULL, port);
358 assert(gearmand != NULL);
359
360+ gearmand_set_time_order(gearmand, time_order);
361+
362 if (queue_type != NULL)
363 {
364 assert(argc);
365
366=== added file 'tests/time_order_test.c'
367--- tests/time_order_test.c 1970-01-01 00:00:00 +0000
368+++ tests/time_order_test.c 2010-01-20 21:27:13 +0000
369@@ -0,0 +1,197 @@
370+/* Gearman server and library
371+ * Copyright (C) 2008 Brian Aker, Eric Day
372+ * All rights reserved.
373+ *
374+ * Use and distribution licensed under the BSD license. See
375+ * the COPYING file in the parent directory for full text.
376+ */
377+
378+#include "config.h"
379+
380+#if defined(NDEBUG)
381+# undef NDEBUG
382+#endif
383+
384+#include <assert.h>
385+#include <stdio.h>
386+#include <stdlib.h>
387+#include <string.h>
388+#include <unistd.h>
389+
390+#include <libgearman/gearman.h>
391+#include "test.h"
392+#include "test_gearmand.h"
393+
394+#define WORKER_TEST_PORT 32123
395+
396+typedef struct
397+{
398+ pid_t gearmand_pid;
399+ gearman_worker_st worker;
400+ bool run_worker;
401+} worker_test_st;
402+
403+/* Prototypes */
404+test_return_t queue_add(void *object);
405+test_return_t queue_worker(void *object);
406+
407+test_return_t pre(void *object);
408+test_return_t post(void *object);
409+
410+void *world_create(test_return_t *error);
411+test_return_t world_destroy(void *object);
412+
413+/* append test for worker */
414+static void *append_function(gearman_job_st *job __attribute__((unused)),
415+ void *context, size_t *result_size,
416+ gearman_return_t *ret_ptr __attribute__((unused)))
417+{
418+ /* this will will set the last char in the context (buffer) to the */
419+ /* first char of the work */
420+ char * buf = (char *)context;
421+ char * work = (char *)gearman_job_workload(job);
422+ buf += strlen(buf);
423+ *buf = *work;
424+ *result_size= 0;
425+ return NULL;
426+}
427+
428+test_return_t queue_add(void *object)
429+{
430+ worker_test_st *test= (worker_test_st *)object;
431+ gearman_client_st client;
432+ char job_handle[GEARMAN_JOB_HANDLE_SIZE];
433+
434+ uint8_t * value= (uint8_t *)strdup("0");
435+ size_t value_length= 1;
436+ uint8_t i;
437+
438+ test->run_worker= false;
439+
440+ if (gearman_client_create(&client) == NULL)
441+ return TEST_FAILURE;
442+
443+ if (gearman_client_add_server(&client, NULL,
444+ WORKER_TEST_PORT) != GEARMAN_SUCCESS)
445+ {
446+ return TEST_FAILURE;
447+ }
448+
449+ /* send strings "0", "1" ... "9" to alternating between 2 queues */
450+ /* queue1 = 1,3,5,7,9 */
451+ /* queue2 = 0,2,4,6,8 */
452+ for (i=0; i<10; i++) {
453+ if (gearman_client_do_background(&client, i % 2 ? "queue1" : "queue2", NULL, value,
454+ value_length, job_handle) != GEARMAN_SUCCESS)
455+ {
456+ return TEST_FAILURE;
457+ }
458+ *value = (uint8_t)(*value + 1);
459+ }
460+
461+ gearman_client_free(&client);
462+ free(value);
463+
464+ test->run_worker= true;
465+ return TEST_SUCCESS;
466+}
467+
468+test_return_t queue_worker(void *object)
469+{
470+ worker_test_st *test= (worker_test_st *)object;
471+ gearman_worker_st *worker= &(test->worker);
472+ char buffer[11];
473+ int i;
474+ memset(buffer, 0, 11);
475+
476+ if (!test->run_worker)
477+ return TEST_FAILURE;
478+
479+ if (gearman_worker_add_function(worker, "queue1", 5, append_function,
480+ buffer) != GEARMAN_SUCCESS)
481+ {
482+ return TEST_FAILURE;
483+ }
484+
485+ if (gearman_worker_add_function(worker, "queue2", 5, append_function,
486+ buffer) != GEARMAN_SUCCESS)
487+ {
488+ return TEST_FAILURE;
489+ }
490+
491+ for (i=0; i<10; i++) {
492+ if (gearman_worker_work(worker) != GEARMAN_SUCCESS)
493+ return TEST_FAILURE;
494+ }
495+
496+ // expect buffer to be reassembled in order
497+ // of insert, not the default "sticky queue" order
498+ if( strcmp(buffer, "0123456789") )
499+ return TEST_FAILURE;
500+
501+ return TEST_SUCCESS;
502+}
503+
504+
505+void *world_create(test_return_t *error)
506+{
507+ worker_test_st *test;
508+ const char *argv[2]= { "test_gearmand", "--time-order"};
509+ pid_t gearmand_pid;
510+
511+ gearmand_pid= test_gearmand_start(WORKER_TEST_PORT, NULL, (char **)argv, 2);
512+
513+ test= malloc(sizeof(worker_test_st));
514+ if (! test)
515+ {
516+ *error= TEST_MEMORY_ALLOCATION_FAILURE;
517+ return NULL;
518+ }
519+
520+ memset(test, 0, sizeof(worker_test_st));
521+ if (gearman_worker_create(&(test->worker)) == NULL)
522+ {
523+ *error= TEST_FAILURE;
524+ return NULL;
525+ }
526+
527+ if (gearman_worker_add_server(&(test->worker), NULL, WORKER_TEST_PORT) != GEARMAN_SUCCESS)
528+ {
529+ *error= TEST_FAILURE;
530+ return NULL;
531+ }
532+
533+ test->gearmand_pid= gearmand_pid;
534+
535+ *error= TEST_SUCCESS;
536+
537+ return (void *)test;
538+}
539+
540+test_return_t world_destroy(void *object)
541+{
542+ worker_test_st *test= (worker_test_st *)object;
543+ gearman_worker_free(&(test->worker));
544+ test_gearmand_stop(test->gearmand_pid);
545+ free(test);
546+
547+ return TEST_SUCCESS;
548+}
549+
550+test_st tests[] ={
551+ {"add", 0, queue_add },
552+ {"worker", 0, queue_worker },
553+ {0, 0, 0}
554+};
555+
556+collection_st collection[] ={
557+ {"time_order", 0, 0, tests},
558+ {0, 0, 0, 0}
559+};
560+
561+void get_world(world_st *world)
562+{
563+ world->collections= collection;
564+ world->create= world_create;
565+ world->destroy= world_destroy;
566+}

Subscribers

People subscribed via source and target branches

to all changes: