Merge lp:~clint-fewbar/gearmand/round-robin2 into lp:gearmand/1.0

Proposed by Clint Byrum on 2010-02-02
Status: Merged
Merged at revision: not available
Proposed branch: lp:~clint-fewbar/gearmand/round-robin2
Merge into: lp:gearmand/1.0
Diff against target: 150 lines (+60/-1)
5 files modified
gearmand/gearmand.c (+7/-0)
libgearman-server/constants.h (+2/-1)
libgearman-server/gearmand.c (+11/-0)
libgearman-server/gearmand.h (+10/-0)
libgearman-server/job.c (+30/-0)
To merge this branch: bzr merge lp:~clint-fewbar/gearmand/round-robin2
Reviewer Review Type Date Requested Status
Gearman-developers 2010-02-02 Pending
Review via email: mp+18426@code.launchpad.net
To post a comment you must log in.
Clint Byrum (clint-fewbar) wrote :

Adds -R/--round-robin for round-robin job assignment per connection

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'gearmand/gearmand.c'
2--- gearmand/gearmand.c 2010-01-28 21:45:14 +0000
3+++ gearmand/gearmand.c 2010-02-02 00:49:09 +0000
4@@ -118,6 +118,7 @@
5 gearmand_log_info_st log_info;
6 bool close_stdio= false;
7 int fd;
8+ bool round_robin= false;
9
10 log_info.file= NULL;
11 log_info.fd= -1;
12@@ -157,6 +158,9 @@
13 MCO("port", 'p', "PORT", "Port the server should listen on.")
14 MCO("pid-file", 'P', "FILE", "File to write process ID out to.")
15 MCO("protocol", 'r', "PROTOCOL", "Load protocol module.")
16+ MCO("round-robin", 'R', NULL, "Assign work in round-robin order per worker"
17+ "connection. The default is to assign work in the order of functions "
18+ "added by the worker.")
19 MCO("queue-type", 'q', "QUEUE", "Persistent queue type to use.")
20 MCO("threads", 't', "THREADS", "Number of I/O threads to use. Default=0.")
21 MCO("user", 'u', "USER", "Switch to given user after startup.")
22@@ -294,6 +298,8 @@
23 user= value;
24 else if (!strcmp(name, "verbose"))
25 verbose++;
26+ else if (!strcmp(name, "round-robin"))
27+ round_robin++;
28 else if (!strcmp(name, "version"))
29 printf("\ngearmand %s - %s\n", gearman_version(), gearman_bugreport());
30 else if (!strcmp(name, "worker-wakeup"))
31@@ -351,6 +357,7 @@
32 gearmand_set_job_retries(_gearmand, job_retries);
33 gearmand_set_worker_wakeup(_gearmand, worker_wakeup);
34 gearmand_set_log_fn(_gearmand, _log, &log_info, verbose);
35+ gearmand_set_round_robin(_gearmand, round_robin);
36
37 if (queue_type != NULL)
38 {
39
40=== modified file 'libgearman-server/constants.h'
41--- libgearman-server/constants.h 2009-12-22 21:21:21 +0000
42+++ libgearman-server/constants.h 2010-02-02 00:49:09 +0000
43@@ -49,7 +49,8 @@
44 {
45 GEARMAN_SERVER_ALLOCATED= (1 << 0),
46 GEARMAN_SERVER_PROC_THREAD= (1 << 1),
47- GEARMAN_SERVER_QUEUE_REPLAY= (1 << 2)
48+ GEARMAN_SERVER_QUEUE_REPLAY= (1 << 2),
49+ GEARMAN_SERVER_RR_ORDER= (1 << 3),
50 } gearman_server_options_t;
51
52 /**
53
54=== modified file 'libgearman-server/gearmand.c'
55--- libgearman-server/gearmand.c 2010-01-28 21:45:14 +0000
56+++ libgearman-server/gearmand.c 2010-02-02 00:49:09 +0000
57@@ -280,6 +280,17 @@
58 GEARMAN_ERROR(gearmand, "gearmand_wakeup:write:%d", errno)
59 }
60
61+void gearmand_set_round_robin(gearmand_st *gearmand, bool round_robin)
62+{
63+ if (round_robin)
64+ {
65+ gearmand->server.options|= GEARMAN_SERVER_RR_ORDER;
66+ return;
67+ }
68+ gearmand->server.options&= GEARMAN_SERVER_RR_ORDER;
69+}
70+
71+
72 /*
73 * Private definitions
74 */
75
76=== modified file 'libgearman-server/gearmand.h'
77--- libgearman-server/gearmand.h 2010-01-28 21:45:14 +0000
78+++ libgearman-server/gearmand.h 2010-02-02 00:49:09 +0000
79@@ -175,6 +175,16 @@
80 GEARMAN_API
81 void gearmand_wakeup(gearmand_st *gearmand, gearmand_wakeup_t wakeup);
82
83+/**
84+ * Sets the round-robin mode on the server object. RR will distribute work
85+ * fairly among every function assigned to a worker, instead of draining
86+ * each function before moving on to the next.
87+ * @param gearmand Server instance previously initialized
88+ * @param bool true=round robin is used, false=round robin is not used
89+ */
90+GEARMAN_API
91+void gearmand_set_round_robin(gearmand_st *gearmand, bool round_robin);
92+
93 /** @} */
94
95 #ifdef __cplusplus
96
97=== modified file 'libgearman-server/job.c'
98--- libgearman-server/job.c 2010-01-28 21:45:14 +0000
99+++ libgearman-server/job.c 2010-02-02 00:49:09 +0000
100@@ -29,6 +29,12 @@
101 static uint32_t _server_job_hash(const char *key, size_t key_size);
102
103 /**
104+ * Appends a worker onto the end of a list of workers.
105+ */
106+static inline void _server_con_worker_list_append(gearman_server_worker_st *list,
107+ gearman_server_worker_st *worker);
108+
109+/**
110 * Get a server job structure from the unique ID. If data_size is non-zero,
111 * then unique points to the workload data and not a real unique key.
112 */
113@@ -333,6 +339,20 @@
114 return NULL;
115 }
116
117+static inline void _server_con_worker_list_append(gearman_server_worker_st *list,
118+ gearman_server_worker_st *worker)
119+{
120+ worker->con_prev= NULL;
121+ worker->con_next= list;
122+ while (worker->con_next != NULL)
123+ {
124+ worker->con_prev= worker->con_next;
125+ worker->con_next= worker->con_next->con_next;
126+ }
127+ if (worker->con_prev)
128+ worker->con_prev->con_next= worker;
129+}
130+
131 gearman_server_job_st *
132 gearman_server_job_take(gearman_server_con_st *server_con)
133 {
134@@ -350,6 +370,16 @@
135 if (server_worker == NULL)
136 return NULL;
137
138+ if (server_con->thread->server->options & GEARMAN_SERVER_RR_ORDER)
139+ {
140+ GEARMAN_LIST_DEL(server_con->worker, server_worker, con_)
141+ _server_con_worker_list_append(server_con->worker_list, server_worker);
142+ ++server_con->worker_count;
143+ if (server_con->worker_list == NULL) {
144+ server_con->worker_list= server_worker;
145+ }
146+ }
147+
148 for (priority= GEARMAN_JOB_PRIORITY_HIGH;
149 priority != GEARMAN_JOB_PRIORITY_MAX; priority++)
150 {

Subscribers

People subscribed via source and target branches