Merge lp:~j-pureftpd/gearmand/tokyo into lp:gearmand/1.0
- tokyo
- Merge into 1.0
Status: | Merged |
---|---|
Merged at revision: | not available |
Proposed branch: | lp:~j-pureftpd/gearmand/tokyo |
Merge into: | lp:gearmand/1.0 |
Diff against target: |
644 lines (+550/-0) 6 files modified
configure.ac (+2/-0) gearmand/gearmand.c (+25/-0) libgearman-server/include.am (+8/-0) libgearman-server/queue_libtokyocabinet.c (+418/-0) libgearman-server/queue_libtokyocabinet.h (+62/-0) m4/pandora_have_libtokyocabinet.m4.moved (+35/-0) |
To merge this branch: | bzr merge lp:~j-pureftpd/gearmand/tokyo |
Related bugs: | |
Related blueprints: |
Persistent queue module using Tokyo Cabinet
(Undefined)
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Frank Denis (community) | Needs Resubmitting | ||
Eric Day (community) | Needs Fixing | ||
Review via email: mp+9049@code.launchpad.net |
Commit message
Description of the change
Frank Denis (j-pureftpd) wrote : | # |
Eric Day (eday) wrote : | # |
> I'll keep working on it, and maybe add support for Tokyo Tyrant.
This is great! I just have a few things when looking over the branch:
* Make GEARMAN_
* Support priority. Right now it looks like this is just ignored, but it should be an easy fix to encode this into the key or pack into the value somehow. It would be nice to have the interface fully supported.
Frank Denis (j-pureftpd) wrote : | # |
Hi Eric,
The prefix has been removed as it actually doesn't make any sense for a Tokyo Cabinet database.
Also, the storage is now using a btree instead of a hash table and properly supports priorities.
- 83. By j <j@linux>
-
Sync Tokyo branch with trunk #228
- 84. By j <j@linux>
-
Untab
- 85. By j <j@linux>
-
Spaces
- 86. By j <j@linux>
-
Switch Tokyocabinet databases to btrees instead of hashes.
This is slower and requires more storage space, but eventually it will
bring ordered processing and priority support. - 87. By j <j@linux>
-
Prefixes don't make any sense with Tokyocabinet.
- 88. By j <j@linux>
-
Change the database format in order to handle priorities.
+ a bunch of small fixes. - 89. By j <j@linux>
-
Fix computation of the function data length.
- 90. By j <j@linux>
-
Don't ignore errors
- 91. By j <j@linux>
-
Calling tcbdboptimize() after destacking the queue is a good way to
save space. - 92. By Jedi/Sector One <j@ubuntu>
-
Sync with trunk#229
- 93. By j <j@linux>
-
Sync with trunk#230
- 94. By j <j@linux>
-
Sync with trunk#270
- 95. By j <j@linux>
-
Sync with trunk#309
- 96. By j <j@linux>
-
Sync to trunk #315
- 97. By j <j@linux>
-
Merge Clint's changes
Frank Denis (j-pureftpd) wrote : | # |
The branch is kept up to date with the trunk.
May you review and merge it?
Clint Byrum (clint-fewbar) wrote : | # |
I'd like to see it merged as well, as we're about to start using it in production and I'd rather not see it "fork" in any way.
One thing that needs to happen first though, is m5/pandora_
Preview Diff
1 | === modified file 'configure.ac' |
2 | --- configure.ac 2009-12-30 19:13:05 +0000 |
3 | +++ configure.ac 2010-01-21 12:07:13 +0000 |
4 | @@ -40,6 +40,7 @@ |
5 | PANDORA_HAVE_LIBMEMCACHED |
6 | PANDORA_HAVE_MEMCACHED |
7 | PANDORA_HAVE_LIBPQ |
8 | +PANDORA_HAVE_LIBTOKYOCABINET |
9 | |
10 | |
11 | AC_CHECK_HEADERS(errno.h fcntl.h getopt.h netinet/tcp.h pwd.h signal.h) |
12 | @@ -72,5 +73,6 @@ |
13 | echo " * Building with libdrizzle $ac_cv_libdrizzle" |
14 | echo " * Building with libmemcached $ac_cv_libmemcached" |
15 | echo " * Building with libpq $ac_cv_libpq" |
16 | +echo " * Building with tokyocabinet $ac_cv_libtokyocabinet" |
17 | echo "" |
18 | echo "---" |
19 | |
20 | === modified file 'gearmand/gearmand.c' |
21 | --- gearmand/gearmand.c 2009-12-02 05:22:18 +0000 |
22 | +++ gearmand/gearmand.c 2010-01-21 12:07:13 +0000 |
23 | @@ -71,6 +71,10 @@ |
24 | #include <libgearman-server/queue_libpq.h> |
25 | #endif |
26 | |
27 | +#ifdef HAVE_LIBTOKYOCABINET |
28 | +#include <libgearman-server/queue_libtokyocabinet.h> |
29 | +#endif |
30 | + |
31 | #include <libgearman-server/protocol_http.h> |
32 | |
33 | #define GEARMAND_LOG_REOPEN_TIME 60 |
34 | @@ -189,6 +193,14 @@ |
35 | return 1; |
36 | } |
37 | #endif |
38 | +#ifdef HAVE_LIBTOKYOCABINET |
39 | + if (gearman_server_queue_libtokyocabinet_conf(&conf) != GEARMAN_SUCCESS) |
40 | + { |
41 | + fprintf(stderr, "gearmand: gearman_queue_libtokyocabinet_conf: %s\n", |
42 | + gearman_conf_error(&conf)); |
43 | + return 1; |
44 | + } |
45 | +#endif |
46 | |
47 | #ifdef HAVE_LIBSQLITE3 |
48 | if (gearman_server_queue_libsqlite3_conf(&conf) != GEARMAN_SUCCESS) |
49 | @@ -378,6 +390,15 @@ |
50 | } |
51 | else |
52 | #endif |
53 | +#ifdef HAVE_LIBTOKYOCABINET |
54 | + if (!strcmp(queue_type, "libtokyocabinet")) |
55 | + { |
56 | + ret= gearmand_queue_libtokyocabinet_init(_gearmand, &conf); |
57 | + if (ret != GEARMAN_SUCCESS) |
58 | + return 1; |
59 | + } |
60 | + else |
61 | +#endif |
62 | { |
63 | fprintf(stderr, "gearmand: Unknown queue module: %s\n", queue_type); |
64 | return 1; |
65 | @@ -422,6 +443,10 @@ |
66 | if (!strcmp(queue_type, "libpq")) |
67 | gearmand_queue_libpq_deinit(_gearmand); |
68 | #endif |
69 | +#ifdef HAVE_LIBTOKYOCABINET |
70 | + if (!strcmp(queue_type, "libtokyocabinet")) |
71 | + gearmand_queue_libtokyocabinet_deinit(_gearmand); |
72 | +#endif |
73 | } |
74 | |
75 | while (gearman_conf_module_value(&module, &name, &value)) |
76 | |
77 | === modified file 'libgearman-server/include.am' |
78 | --- libgearman-server/include.am 2009-12-31 00:20:39 +0000 |
79 | +++ libgearman-server/include.am 2010-01-21 12:07:13 +0000 |
80 | @@ -30,6 +30,11 @@ |
81 | QUEUE_LIBPQ_C= libgearman-server/queue_libpq.c |
82 | endif |
83 | |
84 | +if HAVE_LIBTOKYOCABINET |
85 | +QUEUE_LIBTOKYOCABINET_H= libgearman-server/queue_libtokyocabinet.h |
86 | +QUEUE_LIBTOKYOCABINET_C= libgearman-server/queue_libtokyocabinet.c |
87 | +endif |
88 | + |
89 | noinst_LTLIBRARIES+= libgearman-server/libgearman-server.la |
90 | |
91 | |
92 | @@ -38,6 +43,7 @@ |
93 | $(QUEUE_LIBMEMCACHED_H) \ |
94 | $(QUEUE_LIBPQ_H) \ |
95 | $(QUEUE_LIBSQLITE3_H) \ |
96 | + $(QUEUE_LIBTOKYOCABINET_H) \ |
97 | libgearman-server/client.h \ |
98 | libgearman-server/conf.h \ |
99 | libgearman-server/conf_module.h \ |
100 | @@ -62,6 +68,7 @@ |
101 | $(QUEUE_LIBMEMCACHED_C) \ |
102 | $(QUEUE_LIBPQ_C) \ |
103 | $(QUEUE_LIBSQLITE3_C) \ |
104 | + $(QUEUE_LIBTOKYOCABINET_C) \ |
105 | libgearman-server/client.c \ |
106 | libgearman-server/conf.c \ |
107 | libgearman-server/conf_module.c \ |
108 | @@ -87,5 +94,6 @@ |
109 | $(LTLIBMEMCACHED) \ |
110 | $(LTLIBPQ) \ |
111 | $(LTLIBSQLITE3) \ |
112 | + $(LTLIBTOKYOCABINET) \ |
113 | libgearman/libgearman.la \ |
114 | libgearman/libgearmancore.la |
115 | |
116 | === added file 'libgearman-server/queue_libtokyocabinet.c' |
117 | --- libgearman-server/queue_libtokyocabinet.c 1970-01-01 00:00:00 +0000 |
118 | +++ libgearman-server/queue_libtokyocabinet.c 2010-01-21 12:07:13 +0000 |
119 | @@ -0,0 +1,418 @@ |
120 | +/** |
121 | + * @file |
122 | + * @brief Tokyo Cabinet Queue Storage Definitions |
123 | + */ |
124 | + |
125 | +#include "common.h" |
126 | + |
127 | +#include <libgearman-server/queue_libtokyocabinet.h> |
128 | +#include <tcutil.h> |
129 | +#include <tcbdb.h> |
130 | + |
131 | +/** |
132 | + * @addtogroup gearman_queue_libtokyocabinet libtokyocabinet Queue Storage Functions |
133 | + * @ingroup gearman_queue |
134 | + * @{ |
135 | + */ |
136 | + |
137 | +/* |
138 | + * Private declarations |
139 | + */ |
140 | + |
141 | +/** |
142 | + * Structure for libtokyocabinet specific data. |
143 | + */ |
144 | +typedef struct |
145 | +{ |
146 | + TCBDB *db; |
147 | +} gearman_queue_libtokyocabinet_st; |
148 | + |
149 | +/* Queue callback functions. */ |
150 | +static gearman_return_t _libtokyocabinet_add(gearman_server_st *server, void *context, |
151 | + const void *unique, |
152 | + size_t unique_size, |
153 | + const void *function_name, |
154 | + size_t function_name_size, |
155 | + const void *data, size_t data_size, |
156 | + gearman_job_priority_t priority); |
157 | +static gearman_return_t _libtokyocabinet_flush(gearman_server_st *server, void *context); |
158 | +static gearman_return_t _libtokyocabinet_done(gearman_server_st *server, void *context, |
159 | + const void *unique, |
160 | + size_t unique_size, |
161 | + const void *function_name, |
162 | + size_t function_name_size); |
163 | +static gearman_return_t _libtokyocabinet_replay(gearman_server_st *server, void *context, |
164 | + gearman_queue_add_fn *add_fn, |
165 | + void *add_context); |
166 | + |
167 | +/* |
168 | + * Public definitions |
169 | + */ |
170 | + |
171 | +gearman_return_t gearman_server_queue_libtokyocabinet_conf(gearman_conf_st *conf) |
172 | +{ |
173 | + gearman_conf_module_st *module; |
174 | + |
175 | + module= gearman_conf_module_create(conf, NULL, "libtokyocabinet"); |
176 | + if (module == NULL) |
177 | + return GEARMAN_MEMORY_ALLOCATION_FAILURE; |
178 | + |
179 | + gearman_conf_module_add_option(module, "file", 0, "FILE_NAME", |
180 | + "File name of the database."); |
181 | + gearman_conf_module_add_option(module, "optimize", 0, "yes/no", |
182 | + "Optimize database on open. [default=yes]"); |
183 | + return gearman_conf_return(conf); |
184 | +} |
185 | + |
186 | +gearman_return_t gearman_queue_libtokyocabinet_init(gearman_server_st *server, |
187 | + gearman_conf_st *conf) |
188 | +{ |
189 | + gearman_queue_libtokyocabinet_st *queue; |
190 | + gearman_conf_module_st *module; |
191 | + const char *name; |
192 | + const char *value; |
193 | + const char *opt_file= NULL; |
194 | + const char *opt_optimize= NULL; |
195 | + |
196 | + GEARMAN_SERVER_INFO(server, "Initializing libtokyocabinet module") |
197 | + |
198 | + queue= calloc(1, sizeof(gearman_queue_libtokyocabinet_st)); |
199 | + if (queue == NULL) |
200 | + { |
201 | + GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libtokyocabinet_init", "malloc") |
202 | + return GEARMAN_MEMORY_ALLOCATION_FAILURE; |
203 | + } |
204 | + |
205 | + if ((queue->db= tcbdbnew()) == NULL) |
206 | + { |
207 | + free(queue); |
208 | + GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libtokyocabinet_init", |
209 | + "tcbdbnew") |
210 | + return GEARMAN_QUEUE_ERROR; |
211 | + } |
212 | + |
213 | + tcbdbsetdfunit(queue->db, 8); |
214 | + |
215 | + /* Get module and parse the option values that were given. */ |
216 | + module= gearman_conf_module_find(conf, "libtokyocabinet"); |
217 | + if (module == NULL) |
218 | + { |
219 | + GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libtokyocabinet_init", |
220 | + "modconf_module_find:NULL") |
221 | + return GEARMAN_QUEUE_ERROR; |
222 | + } |
223 | + |
224 | + while (gearman_conf_module_value(module, &name, &value)) |
225 | + { |
226 | + if (!strcmp(name, "file")) |
227 | + opt_file= value; |
228 | + else if (!strcmp(name, "optimize")) |
229 | + opt_optimize= value; |
230 | + else |
231 | + { |
232 | + tcbdbdel(queue->db); |
233 | + free(queue); |
234 | + GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libtokyocabinet_init", |
235 | + "Unknown argument: %s", name) |
236 | + return GEARMAN_QUEUE_ERROR; |
237 | + } |
238 | + } |
239 | + |
240 | + if (opt_file == NULL) |
241 | + { |
242 | + GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libtokyocabinet_init", |
243 | + "No --file given") |
244 | + return GEARMAN_QUEUE_ERROR; |
245 | + } |
246 | + |
247 | +#ifdef HAVE_EVENT_BASE_NEW |
248 | + if (!tcbdbsetmutex(queue->db)) |
249 | + { |
250 | + tcbdbdel(queue->db); |
251 | + free(queue); |
252 | + |
253 | + GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libtokyocabinet_init", |
254 | + "tcbdbsetmutex") |
255 | + return GEARMAN_QUEUE_ERROR; |
256 | + } |
257 | +#endif |
258 | + |
259 | + if (!tcbdbopen(queue->db, opt_file, BDBOWRITER | BDBOCREAT)) |
260 | + { |
261 | + tcbdbdel(queue->db); |
262 | + free(queue); |
263 | + |
264 | + GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libtokyocabinet_init", |
265 | + "tcbdbopen") |
266 | + |
267 | + return GEARMAN_QUEUE_ERROR; |
268 | + } |
269 | + |
270 | + if (opt_optimize == NULL || !strcasecmp(opt_optimize, "yes")) |
271 | + { |
272 | + GEARMAN_SERVER_INFO(server, "libtokyocabinet optimizing database file"); |
273 | + if (!tcbdboptimize(queue->db, 0, 0, 0, -1, -1, UINT8_MAX)) |
274 | + { |
275 | + tcbdbdel(queue->db); |
276 | + free(queue); |
277 | + GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libtokyocabinet_init", |
278 | + "tcbdboptimize") |
279 | + |
280 | + return GEARMAN_QUEUE_ERROR; |
281 | + } |
282 | + } |
283 | + |
284 | + gearman_server_set_queue_context(server, queue); |
285 | + |
286 | + gearman_server_set_queue_add_fn(server, _libtokyocabinet_add); |
287 | + gearman_server_set_queue_flush_fn(server, _libtokyocabinet_flush); |
288 | + gearman_server_set_queue_done_fn(server, _libtokyocabinet_done); |
289 | + gearman_server_set_queue_replay_fn(server, _libtokyocabinet_replay); |
290 | + |
291 | + return GEARMAN_SUCCESS; |
292 | +} |
293 | + |
294 | +gearman_return_t gearman_queue_libtokyocabinet_deinit(gearman_server_st *server) |
295 | +{ |
296 | + gearman_queue_libtokyocabinet_st *queue; |
297 | + |
298 | + GEARMAN_SERVER_INFO(server, "Shutting down libtokyocabinet queue module"); |
299 | + |
300 | + queue= (gearman_queue_libtokyocabinet_st *)gearman_server_queue_context(server); |
301 | + gearman_server_set_queue_context(server, NULL); |
302 | + tcbdbdel(queue->db); |
303 | + |
304 | + free(queue); |
305 | + |
306 | + return GEARMAN_SUCCESS; |
307 | +} |
308 | + |
309 | +gearman_return_t gearmand_queue_libtokyocabinet_init(gearmand_st *gearmand, |
310 | + gearman_conf_st *conf) |
311 | +{ |
312 | + return gearman_queue_libtokyocabinet_init(&(gearmand->server), conf); |
313 | +} |
314 | + |
315 | +gearman_return_t gearmand_queue_libtokyocabinet_deinit(gearmand_st *gearmand) |
316 | +{ |
317 | + return gearman_queue_libtokyocabinet_deinit(&(gearmand->server)); |
318 | +} |
319 | + |
320 | +/* |
321 | + * Private definitions |
322 | + */ |
323 | + |
324 | +static gearman_return_t _libtokyocabinet_add(gearman_server_st *server, void *context, |
325 | + const void *unique, |
326 | + size_t unique_size, |
327 | + const void *function_name, |
328 | + size_t function_name_size, |
329 | + const void *data, size_t data_size, |
330 | + gearman_job_priority_t priority) |
331 | +{ |
332 | + gearman_queue_libtokyocabinet_st *queue= (gearman_queue_libtokyocabinet_st *)context; |
333 | + bool rc; |
334 | + TCXSTR *key; |
335 | + TCXSTR *job_data; |
336 | + |
337 | + GEARMAN_SERVER_DEBUG(server, "libtokyocabinet add: %.*s", |
338 | + (uint32_t)unique_size, (char *)unique); |
339 | + |
340 | + char key_str[GEARMAN_QUEUE_TOKYOCABINET_MAX_KEY_LEN]; |
341 | + size_t key_length= (size_t)snprintf(key_str, GEARMAN_QUEUE_TOKYOCABINET_MAX_KEY_LEN, "%.*s-%.*s", |
342 | + (int)function_name_size, |
343 | + (const char *)function_name, (int)unique_size, |
344 | + (const char *)unique); |
345 | + |
346 | + key= tcxstrnew(); |
347 | + tcxstrcat(key, key_str, (int)key_length); |
348 | + |
349 | + GEARMAN_SERVER_DEBUG(server, "libtokyocabinet key: %.*s", (int)key_length, key_str); |
350 | + |
351 | + job_data= tcxstrnew(); |
352 | + |
353 | + tcxstrcat(job_data, (const char *)function_name, (int)function_name_size); |
354 | + tcxstrcat(job_data, "\0", 1); |
355 | + tcxstrcat(job_data, (const char *)unique, (int)unique_size); |
356 | + tcxstrcat(job_data, "\0", 1); |
357 | + |
358 | + switch (priority) |
359 | + { |
360 | + case GEARMAN_JOB_PRIORITY_HIGH: |
361 | + case GEARMAN_JOB_PRIORITY_MAX: |
362 | + tcxstrcat2(job_data,"0"); |
363 | + break; |
364 | + case GEARMAN_JOB_PRIORITY_LOW: |
365 | + tcxstrcat2(job_data,"2"); |
366 | + break; |
367 | + case GEARMAN_JOB_PRIORITY_NORMAL: |
368 | + default: |
369 | + tcxstrcat2(job_data,"1"); |
370 | + } |
371 | + |
372 | + tcxstrcat(job_data, (const char *)data, (int)data_size); |
373 | + |
374 | + rc= tcbdbput(queue->db, tcxstrptr(key), tcxstrsize(key), |
375 | + tcxstrptr(job_data), tcxstrsize(job_data)); |
376 | + |
377 | + tcxstrdel(key); |
378 | + tcxstrdel(job_data); |
379 | + |
380 | + if (!rc) |
381 | + return GEARMAN_QUEUE_ERROR; |
382 | + |
383 | + return GEARMAN_SUCCESS; |
384 | +} |
385 | + |
386 | +static gearman_return_t _libtokyocabinet_flush(gearman_server_st *server, |
387 | + void *context __attribute__((unused))) |
388 | +{ |
389 | + gearman_queue_libtokyocabinet_st *queue= (gearman_queue_libtokyocabinet_st *)context; |
390 | + |
391 | + GEARMAN_SERVER_DEBUG(server, "libtokyocabinet flush"); |
392 | + if (!tcbdbsync(queue->db)) |
393 | + return GEARMAN_QUEUE_ERROR; |
394 | + |
395 | + return GEARMAN_SUCCESS; |
396 | +} |
397 | + |
398 | +static gearman_return_t _libtokyocabinet_done(gearman_server_st *server, void *context, |
399 | + const void *unique, |
400 | + size_t unique_size, |
401 | + const void *function_name, |
402 | + size_t function_name_size) |
403 | +{ |
404 | + gearman_queue_libtokyocabinet_st *queue= (gearman_queue_libtokyocabinet_st *)context; |
405 | + bool rc; |
406 | + TCXSTR *key; |
407 | + |
408 | + (void) function_name; |
409 | + (void) function_name_size; |
410 | + GEARMAN_SERVER_DEBUG(server, "libtokyocabinet add: %.*s", |
411 | + (uint32_t)unique_size, (char *)unique); |
412 | + |
413 | + char key_str[GEARMAN_QUEUE_TOKYOCABINET_MAX_KEY_LEN]; |
414 | + size_t key_length= (size_t)snprintf(key_str, GEARMAN_QUEUE_TOKYOCABINET_MAX_KEY_LEN, "%.*s-%.*s", |
415 | + (int)function_name_size, |
416 | + (const char *)function_name, (int)unique_size, |
417 | + (const char *)unique); |
418 | + |
419 | + key= tcxstrnew(); |
420 | + tcxstrcat(key, key_str, (int)key_length); |
421 | + rc= tcbdbout3(queue->db, tcxstrptr(key), tcxstrsize(key)); |
422 | + tcxstrdel(key); |
423 | + |
424 | + if (!rc) |
425 | + return GEARMAN_QUEUE_ERROR; |
426 | + |
427 | + return GEARMAN_SUCCESS; |
428 | +} |
429 | + |
430 | +static gearman_return_t _callback_for_record(gearman_server_st *server, |
431 | + TCXSTR *key, TCXSTR *data, |
432 | + gearman_queue_add_fn *add_fn, |
433 | + void *add_context) |
434 | +{ |
435 | + char *data_cstr; |
436 | + size_t data_cstr_size; |
437 | + const char *function; |
438 | + size_t function_len; |
439 | + char *unique; |
440 | + size_t unique_len; |
441 | + gearman_job_priority_t priority; |
442 | + gearman_return_t gret; |
443 | + |
444 | + GEARMAN_SERVER_DEBUG(server, "replaying: %s", (char *) tcxstrptr(key)); |
445 | + |
446 | + data_cstr= (char *)tcxstrptr(data); |
447 | + data_cstr_size= (size_t)tcxstrsize(data); |
448 | + |
449 | + function= data_cstr; |
450 | + function_len= strlen(function); |
451 | + |
452 | + unique= data_cstr+function_len+1; |
453 | + unique_len= strlen(unique); // strlen is only safe because tcxstrptr guarantees nul term |
454 | + |
455 | + // +2 for nulls |
456 | + data_cstr += unique_len+function_len+2; |
457 | + data_cstr_size -= unique_len+function_len+2; |
458 | + |
459 | + assert(unique); |
460 | + assert(unique_len); |
461 | + assert(function); |
462 | + assert(function_len); |
463 | + |
464 | + // single char for priority |
465 | + if (*data_cstr == '2') |
466 | + priority = GEARMAN_JOB_PRIORITY_LOW; |
467 | + else if (*data_cstr == '0') |
468 | + priority = GEARMAN_JOB_PRIORITY_HIGH; |
469 | + else |
470 | + priority = GEARMAN_JOB_PRIORITY_NORMAL; |
471 | + |
472 | + ++data_cstr; |
473 | + --data_cstr_size; |
474 | + |
475 | + // data is freed later so we must make a copy |
476 | + void *data_ptr= malloc(data_cstr_size); |
477 | + if (data_ptr == NULL) |
478 | + { |
479 | + return GEARMAN_QUEUE_ERROR; |
480 | + } |
481 | + memcpy(data_ptr, data_cstr, data_cstr_size); |
482 | + |
483 | + gret = (*add_fn)(server, add_context, unique, unique_len, |
484 | + function, function_len, |
485 | + data_ptr, data_cstr_size, |
486 | + priority); |
487 | + |
488 | + if (gret != GEARMAN_SUCCESS) |
489 | + { |
490 | + return gret; |
491 | + } |
492 | + return GEARMAN_SUCCESS; |
493 | +} |
494 | + |
495 | + |
496 | +static gearman_return_t _libtokyocabinet_replay(gearman_server_st *server, void *context, |
497 | + gearman_queue_add_fn *add_fn, |
498 | + void *add_context) |
499 | +{ |
500 | + gearman_queue_libtokyocabinet_st *queue= (gearman_queue_libtokyocabinet_st *)context; |
501 | + TCXSTR *key; |
502 | + TCXSTR *data; |
503 | + BDBCUR *cur; |
504 | + gearman_return_t gret; |
505 | + gearman_return_t tmp_gret; |
506 | + |
507 | + GEARMAN_SERVER_INFO(server, "libtokyocabinet replay start") |
508 | + |
509 | + cur= tcbdbcurnew(queue->db); |
510 | + if (!cur) |
511 | + return GEARMAN_QUEUE_ERROR; |
512 | + |
513 | + if (!tcbdbcurfirst(cur)) |
514 | + { |
515 | + tcbdbcurdel(cur); |
516 | + return GEARMAN_SUCCESS; |
517 | + } |
518 | + key= tcxstrnew(); |
519 | + data= tcxstrnew(); |
520 | + gret= GEARMAN_SUCCESS; |
521 | + while (tcbdbcurrec(cur, key, data)) |
522 | + { |
523 | + tmp_gret= _callback_for_record(server, key, data, add_fn, add_context); |
524 | + if (tmp_gret != GEARMAN_SUCCESS) |
525 | + { |
526 | + gret= GEARMAN_QUEUE_ERROR; |
527 | + break; |
528 | + } |
529 | + if (!tcbdbcurnext(cur)) |
530 | + break; |
531 | + } |
532 | + tcxstrdel(key); |
533 | + tcxstrdel(data); |
534 | + tcbdbcurdel(cur); |
535 | + |
536 | + return gret; |
537 | +} |
538 | |
539 | === added file 'libgearman-server/queue_libtokyocabinet.h' |
540 | --- libgearman-server/queue_libtokyocabinet.h 1970-01-01 00:00:00 +0000 |
541 | +++ libgearman-server/queue_libtokyocabinet.h 2010-01-21 12:07:13 +0000 |
542 | @@ -0,0 +1,62 @@ |
543 | +/** |
544 | + * @file |
545 | + * @brief libtokyocabinet Queue Storage Declarations |
546 | + */ |
547 | + |
548 | +#ifndef __GEARMAN_QUEUE_LIBTOKYOCABINET_H__ |
549 | +#define __GEARMAN_QUEUE_LIBTOKYOCABINET_H__ |
550 | + |
551 | +/** |
552 | + * It is unclear from tokyocabinet's public headers what, if any, limit there is. 4k seems sane. |
553 | + */ |
554 | +#define GEARMAN_QUEUE_TOKYOCABINET_MAX_KEY_LEN 4096 |
555 | + |
556 | +#ifdef __cplusplus |
557 | +extern "C" { |
558 | +#endif |
559 | + |
560 | +/** |
561 | + * @addtogroup gearman_queue_libtokyocabinet libtokyocabinet Queue Storage Functions |
562 | + * @ingroup gearman_queue |
563 | + * @{ |
564 | + */ |
565 | + |
566 | +/** |
567 | + * Get module configuration options. |
568 | + */ |
569 | +GEARMAN_API |
570 | +gearman_return_t gearman_server_queue_libtokyocabinet_conf(gearman_conf_st *conf); |
571 | + |
572 | +/** |
573 | + * Initialize the queue. |
574 | + */ |
575 | +GEARMAN_API |
576 | +gearman_return_t gearman_queue_libtokyocabinet_init(gearman_server_st *server, |
577 | + gearman_conf_st *conf); |
578 | + |
579 | +/** |
580 | + * De-initialize the queue. |
581 | + */ |
582 | +GEARMAN_API |
583 | +gearman_return_t gearman_queue_libtokyocabinet_deinit(gearman_server_st *server); |
584 | + |
585 | +/** |
586 | + * Initialize the queue for a gearmand object. |
587 | + */ |
588 | +GEARMAN_API |
589 | +gearman_return_t gearmand_queue_libtokyocabinet_init(gearmand_st *gearmand, |
590 | + gearman_conf_st *conf); |
591 | + |
592 | +/** |
593 | + * De-initialize the queue for a gearmand object. |
594 | + */ |
595 | +GEARMAN_API |
596 | +gearman_return_t gearmand_queue_libtokyocabinet_deinit(gearmand_st *server); |
597 | + |
598 | +/** @} */ |
599 | + |
600 | +#ifdef __cplusplus |
601 | +} |
602 | +#endif |
603 | + |
604 | +#endif /* __GEARMAN_QUEUE_LIBTOKYOCABINET_H__ */ |
605 | |
606 | === added file 'm4/pandora_have_libtokyocabinet.m4.moved' |
607 | --- m4/pandora_have_libtokyocabinet.m4.moved 1970-01-01 00:00:00 +0000 |
608 | +++ m4/pandora_have_libtokyocabinet.m4.moved 2010-01-21 12:07:13 +0000 |
609 | @@ -0,0 +1,35 @@ |
610 | +dnl Copyright (C) 2009 Sun Microsystems |
611 | +dnl This file is free software; Sun Microsystems |
612 | +dnl gives unlimited permission to copy and/or distribute it, |
613 | +dnl with or without modifications, as long as this notice is preserved. |
614 | + |
615 | +AC_DEFUN([_PANDORA_SEARCH_LIBTOKYOCABINET],[ |
616 | + AC_REQUIRE([AC_LIB_PREFIX]) |
617 | + |
618 | + dnl -------------------------------------------------------------------- |
619 | + dnl Check for libtokyocabinet |
620 | + dnl -------------------------------------------------------------------- |
621 | + |
622 | + AC_LIB_HAVE_LINKFLAGS(tokyocabinet,,[ |
623 | + #include <tcutil.h> |
624 | + #include <tcbdb.h> |
625 | + ], [ |
626 | + TCMAP *map; |
627 | + |
628 | + map = tcmapnew(); |
629 | + tcmapdel(map); |
630 | + ]) |
631 | + |
632 | + AM_CONDITIONAL(HAVE_LIBTOKYOCABINET, [test "x${ac_cv_libtokyocabinet}" = "xyes"]) |
633 | + |
634 | +]) |
635 | + |
636 | +AC_DEFUN([PANDORA_HAVE_LIBTOKYOCABINET],[ |
637 | + AC_REQUIRE([_PANDORA_SEARCH_LIBTOKYOCABINET]) |
638 | +]) |
639 | + |
640 | +AC_DEFUN([PANDORA_REQUIRE_LIBTOKYOCABINET],[ |
641 | + AC_REQUIRE([PANDORA_HAVE_LIBTOKYOCABINET]) |
642 | + AS_IF([test x$ac_cv_libtokyocabinet = xno], |
643 | + AC_MSG_ERROR([libtokyocabbinet is required for ${PACKAGE}])) |
644 | +]) |
I'll keep working on it, and maybe add support for Tokyo Tyrant.