Merge lp:~barry-leslie/pbxt/pbms into lp:pbxt

Proposed by Barry Leslie
Status: Merged
Merged at revision: 863
Proposed branch: lp:~barry-leslie/pbxt/pbms
Merge into: lp:pbxt
Diff against target: 1154 lines (+471/-214)
5 files modified
configure.in (+15/-0)
src/ha_pbxt.cc (+11/-14)
src/pbms.h (+165/-98)
src/pbms_enabled.cc (+250/-90)
src/pbms_enabled.h (+30/-12)
To merge this branch: bzr merge lp:~barry-leslie/pbxt/pbms
Reviewer Review Type Date Requested Status
PBXT Core Pending
Review via email: mp+30591@code.launchpad.net

Description of the change

Here is an update for PBMS compatibility. I have added a configure option --with-pbms to define PBMS_ENABLED for building a pbms enabled version.

To post a comment you must log in.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'configure.in'
2--- configure.in 2010-07-08 11:50:48 +0000
3+++ configure.in 2010-07-21 22:08:48 +0000
4@@ -138,6 +138,21 @@
5 ENG_PLUGIN_DIR="$libdir/mysql/plugin"
6 fi
7 fi
8+
9+# ----- AC_ARG_WITH(pbms-port
10+AC_ARG_WITH(pbms,
11+ [ --with-pbms Enable PBMS support],
12+ [with_pbms="true"],
13+ [with_pbms="false"])
14+
15+if test "$with_pbms" = "true"
16+then
17+ CFLAGS="$CFLAGS -DPBMS_ENABLED"
18+ CXXFLAGS="$CXXFLAGS -DPBMS_ENABLED"
19+fi
20+
21+
22+
23 AC_SUBST(ENG_PLUGIN_DIR)
24 AC_SUBST(ENG_MYSQL_SRC)
25
26
27=== modified file 'src/ha_pbxt.cc'
28--- src/ha_pbxt.cc 2010-05-26 10:59:10 +0000
29+++ src/ha_pbxt.cc 2010-07-21 22:08:48 +0000
30@@ -1048,7 +1048,7 @@
31 * tables belonging to this engine. This in turn may require some of
32 * the stuff below (like xt_create_thread() called from pbxt_close_table()! */
33 #ifdef PBMS_ENABLED
34- pbms_finalize();
35+ pbms_finalize("PBXT");
36 #endif
37 pbxt_call_exit(self);
38 xt_exit_threading(self);
39@@ -1176,7 +1176,7 @@
40
41 #ifdef PBMS_ENABLED
42 PBMSResultRec result;
43- if (!pbms_initialize("PBXT", false, &result)) {
44+ if (!pbms_initialize("PBXT", false, true, &result, NULL)) {
45 xt_logf(XT_NT_ERROR, "pbms_initialize() Error: %s", result.mr_message);
46 goto error_2;
47 }
48@@ -1345,7 +1345,7 @@
49
50 error_3:
51 #ifdef PBMS_ENABLED
52- pbms_finalize();
53+ pbms_finalize("PBXT");
54
55 error_2:
56 #endif
57@@ -2720,7 +2720,8 @@
58
59 done:
60 #ifdef PBMS_ENABLED
61- pbms_completed(table, (err == 0));
62+ if (result.mr_had_blobs)
63+ pbms_completed(table, (err == 0));
64 #endif
65 return err;
66 }
67@@ -2797,16 +2798,11 @@
68 #ifdef PBMS_ENABLED
69 PBMSResultRec result;
70
71- err = pbms_delete_row_blobs(table, old_data, &result);
72+ err = pbms_update_row_blobs(table, old_data, new_data, &result);
73 if (err) {
74- xt_logf(XT_NT_ERROR, "update_row:pbms_delete_row_blobs() Error: %s", result.mr_message);
75+ xt_logf(XT_NT_ERROR, "update_row:pbms_update_row_blobs() Error: %s", result.mr_message);
76 return err;
77 }
78- err = pbms_write_row_blobs(table, new_data, &result);
79- if (err) {
80- xt_logf(XT_NT_ERROR, "update_row:pbms_write_row_blobs() Error: %s", result.mr_message);
81- goto pbms_done;
82- }
83 #endif
84
85 /* GOTCHA: We need to check the auto-increment value on update
86@@ -2833,8 +2829,8 @@
87 pb_open_tab->ot_table->tab_locks.xt_remove_temp_lock(pb_open_tab, TRUE);
88
89 #ifdef PBMS_ENABLED
90- pbms_done:
91- pbms_completed(table, (err == 0));
92+ if (result.mr_had_blobs)
93+ pbms_completed(table, (err == 0));
94 #endif
95
96 return err;
97@@ -2879,7 +2875,8 @@
98 pb_open_tab->ot_table->tab_locks.xt_remove_temp_lock(pb_open_tab, TRUE);
99
100 #ifdef PBMS_ENABLED
101- pbms_completed(table, (err == 0));
102+ if (result.mr_had_blobs)
103+ pbms_completed(table, (err == 0));
104 #endif
105 return err;
106 }
107
108=== modified file 'src/pbms.h'
109--- src/pbms.h 2009-09-03 06:16:11 +0000
110+++ src/pbms.h 2010-07-21 22:08:48 +0000
111@@ -26,8 +26,8 @@
112 * are streaming enabled.
113 *
114 */
115-#ifndef __streaming_unx_h__
116-#define __streaming_unx_h__
117+#ifndef __PBMS_H__
118+#define __PBMS_H__
119
120 #include <stdio.h>
121 #include <sys/types.h>
122@@ -39,22 +39,17 @@
123 #include <signal.h>
124 #include <ctype.h>
125 #include <errno.h>
126-
127+#include <inttypes.h>
128+#include <stdint.h>
129
130 #ifdef USE_PRAGMA_INTERFACE
131 #pragma interface /* gcc class implementation */
132 #endif
133
134-/* 2 10 1 10 20 10 10 20 20
135- * Format: "~*"<db_id><'~' || '_'><tab_id>"-"<blob_id>"-"<auth_code>"-"<server_id>"-"<blob_ref_id>"-"<blob_size>
136- */
137-//If URL_FMT changes do not forget to update couldBeURL() in this file.
138-
139-#define URL_FMT "~*%lu%c%lu-%llu-%lx-%lu-%llu-%llu"
140
141 #define MS_SHARED_MEMORY_MAGIC 0x7E9A120C
142-#define MS_ENGINE_VERSION 1
143-#define MS_CALLBACK_VERSION 4
144+#define MS_ENGINE_VERSION 3
145+#define MS_CALLBACK_VERSION 6
146 #define MS_SHARED_MEMORY_VERSION 2
147 #define MS_ENGINE_LIST_SIZE 10
148 #define MS_TEMP_FILE_PREFIX "pbms_temp_"
149@@ -80,6 +75,8 @@
150 #define MS_ERR_DUPLICATE_DB 14
151 #define MS_ERR_DUPLICATE_DB_ID 15
152 #define MS_ERR_INVALID_OPERATION 16
153+#define MS_ERR_MISSING_CLOUD_REFFERENCE 17
154+#define MS_ERR_SYSTAB_VERSION 18
155
156 #define MS_LOCK_NONE 0
157 #define MS_LOCK_READONLY 1
158@@ -94,6 +91,7 @@
159 #define MS_RESULT_STACK_SIZE 200
160
161 typedef struct PBMSResultRec {
162+ uint8_t mr_had_blobs; /* A flag to indicate if the statement had any PBMS blobs. */
163 int mr_code; /* Engine specific error code. */
164 char mr_message[MS_RESULT_MESSAGE_SIZE]; /* Error message, required if non-zero return code. */
165 char mr_stack[MS_RESULT_STACK_SIZE]; /* Trace information about where the error occurred. */
166@@ -102,15 +100,28 @@
167
168
169 typedef struct PBMSBlobID {
170- u_int32_t bi_db_id;
171- u_int64_t bi_blob_size;
172- u_int64_t bi_blob_id; // or repo file offset if type = REPO
173- u_int64_t bi_blob_ref_id;
174- u_int32_t bi_tab_id; // or repo ID if type = REPO
175- u_int32_t bi_auth_code;
176- u_int32_t bi_blob_type;
177+ uint32_t bi_db_id;
178+ uint64_t bi_blob_size;
179+ uint64_t bi_blob_id; // or repo file offset if type = REPO
180+ uint64_t bi_blob_ref_id;
181+ uint32_t bi_tab_id; // or repo ID if type = REPO
182+ uint32_t bi_auth_code;
183+ uint32_t bi_blob_type;
184 } PBMSBlobIDRec, *PBMSBlobIDPtr;
185
186+
187+typedef struct MSBlobURL {
188+ uint8_t bu_type;
189+ uint32_t bu_db_id;
190+ uint32_t bu_tab_id; // or repo ID if type = REPO
191+ uint64_t bu_blob_id; // or repo file offset if type = REPO
192+ uint32_t bu_auth_code;
193+ uint32_t bu_server_id;
194+ uint64_t bu_blob_size;
195+ uint64_t bu_blob_ref_id; // Unique identifier of the blob reference
196+} MSBlobURLRec, *MSBlobURLPtr;
197+
198+
199 typedef struct PBMSBlobURL {
200 char bu_data[PBMS_BLOB_URL_SIZE];
201 } PBMSBlobURLRec, *PBMSBlobURLPtr;
202@@ -121,8 +132,82 @@
203 int ms_removing; /* TRUE (1) if the engine is being removed. */
204 int ms_internal; /* TRUE (1) if the engine is supported directly in the mysq/drizzle handler code . */
205 char ms_engine_name[32];
206+ int ms_has_transactions; /* TRUE (1) if the engine supports transactions. */
207 } PBMSEngineRec, *PBMSEnginePtr;
208
209+/* 2 10 1 10 20 10 10 20 20
210+ * Format: "~*"<db_id><'~' || '_'><tab_id>"-"<blob_id>"-"<auth_code>"-"<server_id>"-"<blob_ref_id>"-"<blob_size>
211+ */
212+
213+#ifndef PRIu64
214+#define URL_FMT "~*%u%c%u-%llu-%x-%u-%llu-%llu"
215+#else
216+#define URL_FMT "~*%"PRIu32"%c%"PRIu32"-%"PRIu64"-%x-%"PRIu32"-%"PRIu64"-%"PRIu64""
217+#endif
218+#define MS_URL_TYPE_BLOB '~'
219+#define MS_URL_TYPE_REPO '_'
220+class PBMSBlobURLTools
221+{
222+ public:
223+ static bool couldBeURL(const char *blob_url, size_t size, MSBlobURLPtr blob)
224+ {
225+ if (blob_url && (size < PBMS_BLOB_URL_SIZE)) {
226+ MSBlobURLRec ignored_blob;
227+ char buffer[PBMS_BLOB_URL_SIZE+1];
228+ char junk[5];
229+ int scanned;
230+
231+ if (!blob)
232+ blob = &ignored_blob;
233+
234+ junk[0] = 0;
235+ if (blob_url[size]) { // There is no guarantee that the URL will be null terminated.
236+ memcpy(buffer, blob_url, size);
237+ buffer[size] = 0;
238+ blob_url = buffer;
239+ }
240+
241+ scanned = sscanf(blob_url, URL_FMT"%4s",
242+ &blob->bu_db_id,
243+ &blob->bu_type,
244+ &blob->bu_tab_id,
245+ &blob->bu_blob_id,
246+ &blob->bu_auth_code,
247+ &blob->bu_server_id,
248+ &blob->bu_blob_ref_id,
249+ &blob->bu_blob_size,
250+ junk);
251+
252+ if ((scanned != 8) || (blob->bu_type != MS_URL_TYPE_BLOB && blob->bu_type != MS_URL_TYPE_REPO)) {// If junk is found at the end this will also result in an invalid URL.
253+ //printf("Bad URL \"%s\": scanned = %d, junk: %d, %d, %d, %d\n", blob_url, scanned, junk[0], junk[1], junk[2], junk[3]);
254+ return false;
255+ }
256+
257+ return true;
258+ }
259+
260+ return false;
261+ }
262+
263+ static bool couldBeURL(const char *blob_url, MSBlobURLPtr blob)
264+ {
265+ return couldBeURL(blob_url, strlen(blob_url), blob);
266+ }
267+
268+ static void buildBlobURL(MSBlobURLPtr blob, PBMSBlobURLPtr url)
269+ {
270+ snprintf(url->bu_data, PBMS_BLOB_URL_SIZE, URL_FMT, blob->bu_db_id,
271+ blob->bu_type,
272+ blob->bu_tab_id,
273+ blob->bu_blob_id,
274+ blob->bu_auth_code,
275+ blob->bu_server_id,
276+ blob->bu_blob_ref_id,
277+ blob->bu_blob_size);
278+ }
279+};
280+
281+#ifndef DRIZZLED
282 /*
283 * This function should never be called directly, it is called
284 * by deregisterEngine() below.
285@@ -138,7 +223,7 @@
286 *
287 * The BLOB URL must still be retained or it will automaticly be deleted after a timeout expires.
288 */
289-typedef int (*ECCreateBlobsFunc)(bool built_in, const char *db_name, const char *tab_name, char *blob, size_t blob_len, char *blob_url, unsigned short col_index, PBMSResultPtr result);
290+typedef int (*ECCreateBlobsFunc)(bool built_in, const char *db_name, const char *tab_name, char *blob, size_t blob_len, PBMSBlobURLPtr blob_url, PBMSResultPtr result);
291
292 /*
293 * Call this function for each BLOB to be retained. When a BLOB is used, the
294@@ -148,7 +233,7 @@
295 * The returned URL must be inserted into the row in place of the given
296 * URL.
297 */
298-typedef int (*ECRetainBlobsFunc)(bool built_in, const char *db_name, const char *tab_name, char *ret_blob_url, char *blob_url, unsigned short col_index, PBMSResultPtr result);
299+typedef int (*ECRetainBlobsFunc)(bool built_in, const char *db_name, const char *tab_name, PBMSBlobURLPtr ret_blob_url, char *blob_url, unsigned short col_index, PBMSResultPtr result);
300
301 /*
302 * If a row containing a BLOB is deleted, then the BLOBs in the
303@@ -161,7 +246,7 @@
304
305 typedef int (*ECDropTable)(bool built_in, const char *db_name, const char *tab_name, PBMSResultPtr result);
306
307-typedef int (*ECRenameTable)(bool built_in, const char *db_name, const char *from_table, const char *to_table, PBMSResultPtr result);
308+typedef int (*ECRenameTable)(bool built_in, const char *db_name, const char *from_table, const char *to_db, const char *to_table, PBMSResultPtr result);
309
310 typedef void (*ECCallCompleted)(bool built_in, bool ok);
311
312@@ -245,9 +330,9 @@
313 }
314
315 /*
316- * Register the engine with the Stream Engine.
317+ * Register the engine with the Stream Daemon.
318 */
319- int registerEngine(PBMSEnginePtr engine, PBMSResultPtr result) {
320+ int registerEngine(PBMSEnginePtr the_engine, PBMSResultPtr result) {
321 int err;
322
323 deleteTempFiles();
324@@ -256,8 +341,18 @@
325 if ((err = getSharedMemory(true, result)))
326 return err;
327
328+ lock();
329 for (int i=0; i<sharedMemory->sm_list_size; i++) {
330 if (!sharedMemory->sm_engine_list[i]) {
331+ PBMSEnginePtr engine;
332+ engine = (PBMSEnginePtr) malloc(sizeof(PBMSEngineRec));
333+ if (!engine) {
334+ strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "Out of memory.");
335+ err = MS_ERR_ENGINE;
336+ goto done;
337+ }
338+ memcpy(engine, the_engine, sizeof(PBMSEngineRec));
339+
340 sharedMemory->sm_engine_list[i] = engine;
341 engine->ms_index = i;
342 if (i >= sharedMemory->sm_list_len)
343@@ -266,14 +361,20 @@
344 sharedMemory->sm_callbacks->cb_register(engine);
345
346 built_in = (engine->ms_internal == 1);
347- return MS_OK;
348+ err = MS_OK;
349+ goto done;
350 }
351 }
352
353 result->mr_code = 15010;
354 strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "Too many BLOB streaming engines already registered");
355 *result->mr_stack = 0;
356- return MS_ERR_ENGINE;
357+
358+ err = MS_ERR_ENGINE;
359+
360+ done:
361+ unlock();
362+ return err;
363 }
364
365 void lock() {
366@@ -292,7 +393,7 @@
367 sharedMemory->sm_shutdown_lock--;
368 }
369
370- void deregisterEngine(PBMSEnginePtr engine) {
371+ void deregisterEngine(const char *engine_name) {
372 PBMSResultRec result;
373 int err;
374
375@@ -303,10 +404,12 @@
376
377 bool empty = true;
378 for (int i=0; i<sharedMemory->sm_list_len; i++) {
379- if (sharedMemory->sm_engine_list[i]) {
380- if (sharedMemory->sm_engine_list[i] == engine) {
381+ PBMSEnginePtr engine = sharedMemory->sm_engine_list[i];
382+ if (engine) {
383+ if (strcmp(engine->ms_engine_name, engine_name) == 0) {
384 if (sharedMemory->sm_callbacks)
385 sharedMemory->sm_callbacks->cb_deregister(engine);
386+ free(engine);
387 sharedMemory->sm_engine_list[i] = NULL;
388 }
389 else
390@@ -335,51 +438,22 @@
391 sharedMemory = NULL;
392
393 while (*prefix) {
394- getTempFileName(temp_file, *prefix, getpid());
395+ getTempFileName(temp_file, 100, *prefix, getpid());
396 unlink(temp_file);
397 prefix++;
398 }
399 }
400
401- int couldBeURL(char *blob_url, int size)
402+ bool isPBMSLoaded()
403 {
404- if (blob_url && (size < PBMS_BLOB_URL_SIZE)) {
405- char buffer[PBMS_BLOB_URL_SIZE+1];
406- unsigned long db_id = 0;
407- unsigned long tab_id = 0;
408- unsigned long long blob_id = 0;
409- unsigned long long blob_ref_id = 0;
410- unsigned long long blob_size = 0;
411- unsigned long auth_code = 0;
412- unsigned long server_id = 0;
413- char type, junk[5];
414- int scanned;
415-
416- junk[0] = 0;
417- if (blob_url[size]) { // There is no guarantee that the URL will be null terminated.
418- memcpy(buffer, blob_url, size);
419- buffer[size] = 0;
420- blob_url = buffer;
421- }
422-
423- scanned = sscanf(blob_url, URL_FMT"%4s", &db_id, &type, &tab_id, &blob_id, &auth_code, &server_id, &blob_ref_id, &blob_size, junk);
424- if (scanned != 8) {// If junk is found at the end this will also result in an invalid URL.
425- printf("Bad URL \"%s\": scanned = %d, junk: %d, %d, %d, %d\n", blob_url, scanned, junk[0], junk[1], junk[2], junk[3]);
426- return 0;
427- }
428-
429- if (junk[0] || (type != '~' && type != '_')) {
430- printf("Bad URL \"%s\": scanned = %d, junk: %d, %d, %d, %d\n", blob_url, scanned, junk[0], junk[1], junk[2], junk[3]);
431- return 0;
432- }
433-
434- return 1;
435- }
436-
437- return 0;
438+ PBMSResultRec result;
439+ if (getSharedMemory(false, &result))
440+ return false;
441+
442+ return (sharedMemory->sm_callbacks != NULL);
443 }
444
445- int retainBlob(const char *db_name, const char *tab_name, char *ret_blob_url, char *blob_url, size_t blob_size, unsigned short col_index, PBMSResultPtr result)
446+ int retainBlob(const char *db_name, const char *tab_name, PBMSBlobURLPtr ret_blob_url, char *blob_url, size_t blob_size, unsigned short col_index, PBMSResultPtr result)
447 {
448 int err;
449 char safe_url[PBMS_BLOB_URL_SIZE+1];
450@@ -388,17 +462,17 @@
451 if ((err = getSharedMemory(false, result)))
452 return err;
453
454- if (!couldBeURL(blob_url, blob_size)) {
455+ if (!PBMSBlobURLTools::couldBeURL(blob_url, blob_size, NULL)) {
456
457 if (!sharedMemory->sm_callbacks) {
458- *ret_blob_url = 0;
459+ ret_blob_url->bu_data[0] = 0;
460 return MS_OK;
461 }
462- err = sharedMemory->sm_callbacks->cb_create_blob(built_in, db_name, tab_name, blob_url, blob_size, ret_blob_url, col_index, result);
463+ err = sharedMemory->sm_callbacks->cb_create_blob(built_in, db_name, tab_name, blob_url, blob_size, ret_blob_url, result);
464 if (err)
465 return err;
466
467- blob_url = ret_blob_url;
468+ blob_url = ret_blob_url->bu_data;
469 } else {
470 // Make sure the url is a C string:
471 if (blob_url[blob_size]) {
472@@ -411,7 +485,7 @@
473
474 if (!sharedMemory->sm_callbacks) {
475 result->mr_code = MS_ERR_INCORRECT_URL;
476- strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "BLOB streaming engine (PBMS) not installed");
477+ strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "BLOB streaming daemon (PBMS) not installed");
478 *result->mr_stack = 0;
479 return MS_ERR_INCORRECT_URL;
480 }
481@@ -430,7 +504,7 @@
482 if (!sharedMemory->sm_callbacks)
483 return MS_OK;
484
485- if (!couldBeURL(blob_url, blob_size))
486+ if (!PBMSBlobURLTools::couldBeURL(blob_url, blob_size, NULL))
487 return MS_OK;
488
489 if (blob_url[blob_size]) {
490@@ -455,7 +529,7 @@
491 return sharedMemory->sm_callbacks->cb_drop_table(built_in, db_name, tab_name, result);
492 }
493
494- int renameTable(const char *db_name, const char *from_table, const char *to_table, PBMSResultPtr result)
495+ int renameTable(const char *db_name, const char *from_table, const char *to_db, const char *to_table, PBMSResultPtr result)
496 {
497 int err;
498
499@@ -465,7 +539,7 @@
500 if (!sharedMemory->sm_callbacks)
501 return MS_OK;
502
503- return sharedMemory->sm_callbacks->cb_rename_table(built_in, db_name, from_table, to_table, result);
504+ return sharedMemory->sm_callbacks->cb_rename_table(built_in, db_name, from_table, to_db, to_table, result);
505 }
506
507 void completed(int ok)
508@@ -495,7 +569,7 @@
509 return MS_OK;
510
511 while (*prefix) {
512- getTempFileName(temp_file, *prefix, getpid());
513+ getTempFileName(temp_file, 100, *prefix, getpid());
514 tmp_f = open(temp_file, O_RDWR | (create ? O_CREAT : 0), SH_MASK);
515 if (tmp_f == -1)
516 return setOSResult(errno, "open", temp_file, result);
517@@ -515,7 +589,7 @@
518 }
519
520 buffer[tfer] = 0;
521- sscanf(buffer, "%p", &sharedMemory);
522+ sscanf(buffer, "%p", (void**) &sharedMemory);
523 if (!sharedMemory || sharedMemory->sm_magic != MS_SHARED_MEMORY_MAGIC) {
524 if (!create)
525 return MS_OK;
526@@ -531,9 +605,9 @@
527 return setOSResult(errno, "fseek", temp_file, result);
528 }
529
530- sprintf(buffer, "%p", sharedMemory);
531+ snprintf(buffer, 100, "%p", (void*) sharedMemory);
532 tfer = write(tmp_f, buffer, strlen(buffer));
533- if (tfer != strlen(buffer)) {
534+ if (tfer != (ssize_t) strlen(buffer)) {
535 close(tmp_f);
536 return setOSResult(errno, "write", temp_file, result);
537 }
538@@ -583,9 +657,9 @@
539
540 void strcat(size_t size, char *to, int val)
541 {
542- char buffer[100];
543+ char buffer[20];
544
545- sprintf(buffer, "%d", val);
546+ snprintf(buffer, 20, "%d", val);
547 strcat(size, to, buffer);
548 }
549
550@@ -634,9 +708,9 @@
551 return MS_ERR_ENGINE;
552 }
553
554- void getTempFileName(char *temp_file, const char * prefix, int pid)
555+ void getTempFileName(char *temp_file, int buffer_size, const char * prefix, int pid)
556 {
557- sprintf(temp_file, "/tmp/%s%d", prefix, pid);
558+ snprintf(temp_file, buffer_size, "/tmp/%s%d", prefix, pid);
559 }
560
561 bool startsWith(const char *cstr, const char *w_cstr)
562@@ -652,43 +726,34 @@
563
564 void deleteTempFiles()
565 {
566- struct dirent *entry;
567+ struct dirent entry;
568 struct dirent *result;
569 DIR *odir;
570 int err;
571- size_t sz;
572 char temp_file[100];
573
574-#ifdef __sun
575- sz = sizeof(struct dirent) + pathconf("/tmp/", _PC_NAME_MAX); // Solaris, see readdir(3C)
576-#else
577- sz = sizeof(struct dirent);
578-#endif
579- if (!(entry = (struct dirent *) malloc(sz)))
580- return;
581 if (!(odir = opendir("/tmp/")))
582 return;
583- err = readdir_r(odir, entry, &result);
584+ err = readdir_r(odir, &entry, &result);
585 while (!err && result) {
586 const char **prefix = temp_prefix;
587
588 while (*prefix) {
589- if (startsWith(entry->d_name, *prefix)) {
590- int pid = atoi(entry->d_name + strlen(*prefix));
591+ if (startsWith(entry.d_name, *prefix)) {
592+ int pid = atoi(entry.d_name + strlen(*prefix));
593
594 /* If the process does not exist: */
595 if (kill(pid, 0) == -1 && errno == ESRCH) {
596- getTempFileName(temp_file, *prefix, pid);
597+ getTempFileName(temp_file, 100, *prefix, pid);
598 unlink(temp_file);
599 }
600 }
601 prefix++;
602 }
603
604- err = readdir_r(odir, entry, &result);
605+ err = readdir_r(odir, &entry, &result);
606 }
607 closedir(odir);
608- free(entry);
609 }
610 };
611 #endif // PBMS_API
612@@ -741,5 +806,7 @@
613 * PBMSIDToURL():Convert a blob URL to a blob ID.
614 */
615 extern bool PBMSURLToID(char *url, PBMSBlobIDPtr blob_id);
616-
617-#endif
618+#endif //DRIZZLED
619+
620+
621+#endif //__PBMS_H__
622
623=== modified file 'src/pbms_enabled.cc'
624--- src/pbms_enabled.cc 2010-05-04 16:23:07 +0000
625+++ src/pbms_enabled.cc 2010-07-21 22:08:48 +0000
626@@ -22,113 +22,205 @@
627 *
628 * H&G2JCtL
629 *
630- * PBMS interface used to enable engines for use with the PBMS engine.
631+ * PBMS interface used to enable engines for use with the PBMS daemon.
632 *
633 * For an example on how to build this into an engine have a look at the PBXT engine
634 * in file ha_pbxt.cc. Search for 'PBMS_ENABLED'.
635 *
636 */
637
638-#include "xt_config.h"
639-
640-#ifdef PBMS_ENABLED
641-
642-#ifdef DRIZZLED
643-#include <sys/stat.h>
644-#include <drizzled/common_includes.h>
645-#include <drizzled/plugin.h>
646+#ifndef DRIZZLED
647+#if defined(MSDOS) || defined(__WIN__)
648+#include "pbms_enabled.h"
649+
650+// Windows is not supported yet so just stub out the functions..
651+bool pbms_initialize(const char *engine_name __attribute__((unused)),
652+ bool isServer __attribute__((unused)),
653+ bool isTransactional __attribute__((unused)),
654+ PBMSResultPtr result __attribute__((unused)),
655+ IsPBMSFilterFunc is_pbms_blob __attribute__((unused))
656+ ) { return true;}
657+void pbms_finalize() {}
658+int pbms_write_row_blobs(const TABLE *table __attribute__((unused)),
659+ unsigned char *buf __attribute__((unused)),
660+ PBMSResultPtr result __attribute__((unused))
661+ ){ return 0;}
662+int pbms_update_row_blobs(const TABLE *table __attribute__((unused)),
663+ const unsigned char *old_row __attribute__((unused)),
664+ unsigned char *new_row __attribute__((unused)),
665+ PBMSResultPtr result __attribute__((unused))
666+ ){ return 0;}
667+int pbms_delete_row_blobs(const TABLE *table __attribute__((unused)),
668+ const unsigned char *buf __attribute__((unused)),
669+ PBMSResultPtr result __attribute__((unused))
670+ ){ return 0;}
671+int pbms_rename_table_with_blobs(const char *old_table_path __attribute__((unused)),
672+ const char *new_table_path __attribute__((unused)),
673+ PBMSResultPtr result __attribute__((unused))
674+ ){ return 0;}
675+int pbms_delete_table_with_blobs(const char *table_path __attribute__((unused)),
676+ PBMSResultPtr result __attribute__((unused))
677+ ){ return 0;}
678+void pbms_completed(TABLE *table __attribute__((unused)),
679+ bool ok __attribute__((unused))
680+ ){}
681 #else
682+#define PBMS_API pbms_enabled_api
683+
684+#include "pbms_enabled.h"
685 #include "mysql_priv.h"
686 #include <mysql/plugin.h>
687 #define session_alloc(sess, size) thd_alloc(sess, size);
688 #define current_session current_thd
689-#endif
690-
691-#define GET_BLOB_FIELD(t, i) (Field_blob *)(t->field[t->s->blob_field[i]])
692-#define DB_NAME(f) (f->table->s->db.str)
693-#define TAB_NAME(f) (*(f->table_name))
694-
695-#define PBMS_API pbms_enabled_api
696-
697-#include "pbms_enabled.h"
698+
699+#define GET_BLOB_FIELD(t, i) (Field_blob *)(t->field[t->s->blob_field[i]])
700+#define DB_NAME(f) (f->table->s->db.str)
701+#define TAB_NAME(f) (*(f->table_name))
702
703 static PBMS_API pbms_api;
704
705-PBMSEngineRec enabled_engine = {
706- MS_ENGINE_VERSION
707-};
708+/*
709+ * A callback function to check if the column is a PBMS BLOB.
710+ * Can be NULL if no check is to be done.
711+ */
712+static IsPBMSFilterFunc is_pbms_blob = NULL;
713
714 //====================
715-bool pbms_initialize(const char *engine_name, bool isServer, PBMSResultPtr result)
716+bool pbms_initialize(const char *engine_name, bool isServer, bool isTransactional, PBMSResultPtr result, IsPBMSFilterFunc is_pbms_blob_arg)
717 {
718- int err;
719+ int err;
720+ PBMSEngineRec enabled_engine = {
721+ MS_ENGINE_VERSION,
722+ 0,
723+ 0,
724+ 0,
725+ {0},
726+ 0
727+ };
728
729 strncpy(enabled_engine.ms_engine_name, engine_name, 32);
730 enabled_engine.ms_internal = isServer;
731+ enabled_engine.ms_has_transactions = isTransactional;
732 enabled_engine.ms_engine_name[31] = 0;
733
734 err = pbms_api.registerEngine(&enabled_engine, result);
735+ is_pbms_blob = is_pbms_blob_arg;
736
737 return (err == 0);
738 }
739
740
741 //====================
742-void pbms_finalize()
743-{
744- pbms_api.deregisterEngine(&enabled_engine);
745+void pbms_finalize(const char *engine_name)
746+{
747+ pbms_api.deregisterEngine(engine_name);
748+}
749+
750+//==================================
751+static int insertRecord(Field_blob *field, char *blob, size_t org_length, unsigned char *blob_rec, size_t packlength, PBMSResultPtr result)
752+{
753+ int err;
754+ size_t length;
755+ PBMSBlobURLRec blob_url;
756+
757+ err = pbms_api.retainBlob(DB_NAME(field), TAB_NAME(field), &blob_url, blob, org_length, field->field_index, result);
758+ if (err)
759+ return err;
760+
761+ // If the BLOB length changed reset it.
762+ // This will happen if the BLOB data was replaced with a BLOB reference.
763+ length = strlen(blob_url.bu_data) +1;
764+ if ((length != org_length) || memcmp(blob_url.bu_data, blob, length)) {
765+ if (length != org_length) {
766+ field->store_length(blob_rec, packlength, length);
767+ }
768+
769+ if (length > org_length) {
770+ // This can only happen if the BLOB URL is actually larger than the BLOB itself.
771+ blob = (char *) session_alloc(current_session, length);
772+ memcpy(blob_rec+packlength, &blob, sizeof(char*));
773+ }
774+ memcpy(blob, blob_url.bu_data, length);
775+ }
776+
777+ return 0;
778 }
779
780 //====================
781-int pbms_write_row_blobs(TABLE *table, uchar *row_buffer, PBMSResultPtr result)
782+int pbms_update_row_blobs(const TABLE *table, const unsigned char *old_row, unsigned char *new_row, PBMSResultPtr result)
783 {
784 Field_blob *field;
785- char *blob_rec, *blob;
786- size_t packlength, i, org_length, length;
787- char blob_url_buffer[PBMS_BLOB_URL_SIZE];
788+ uint32_t field_offset;
789+ const unsigned char *old_blob_rec;
790+ unsigned char *new_blob_rec;
791+ char *old_blob_url, *new_blob_url;
792+ size_t packlength, i, old_length, new_length;
793 int err;
794- String type_name;
795+ bool old_null_blob, new_null_blob;
796
797+ result->mr_had_blobs = false;
798+
799+ if (!pbms_api.isPBMSLoaded())
800+ return 0;
801+
802 if (table->s->blob_fields == 0)
803 return 0;
804
805 for (i= 0; i < table->s->blob_fields; i++) {
806 field = GET_BLOB_FIELD(table, i);
807
808- // Note: field->type() always returns MYSQL_TYPE_BLOB regardless of the type of BLOB
809- field->sql_type(type_name);
810- if (strcasecmp(type_name.c_ptr(), "LongBlob"))
811- continue;
812+ old_null_blob = field->is_null_in_record(old_row);
813+ new_null_blob = field->is_null_in_record(new_row);
814+ if (old_null_blob && new_null_blob)
815+ continue;
816+
817+ {
818+ String type_name;
819+ // Note: field->type() always returns MYSQL_TYPE_BLOB regardless of the type of BLOB
820+ field->sql_type(type_name);
821+ if (strcasecmp(type_name.c_ptr(), "LongBlob"))
822+ continue;
823+ }
824+
825+ if( is_pbms_blob && !is_pbms_blob(field) )
826+ continue;
827+
828
829 // Get the blob record:
830- blob_rec = (char *)row_buffer + field->offset(field->table->record[0]);
831+ field_offset = field->offset(field->table->record[0]);
832 packlength = field->pack_length() - field->table->s->blob_ptr_size;
833
834- memcpy(&blob, blob_rec +packlength, sizeof(char*));
835- org_length = field->get_length((uchar *)blob_rec);
836+ if (new_null_blob) {
837+ new_blob_url = NULL;
838+ } else {
839+ new_blob_rec = new_row + field_offset;
840+ new_length = field->get_length(new_blob_rec);
841+ memcpy(&new_blob_url, new_blob_rec +packlength, sizeof(char*));
842+ }
843+
844+ if (old_null_blob) {
845+ old_blob_url = NULL;
846+ } else {
847+ old_blob_rec = old_row + field_offset;
848+ old_length = field->get_length(old_blob_rec);
849+ memcpy(&old_blob_url, old_blob_rec +packlength, sizeof(char*));
850+ }
851+
852+ // Check to see if the BLOBs are the same.
853+ // I am assuming that if the BLOB pointer is different then teh BLOB has changed.
854+ // Zero length BLOBs are a special case because they may have a NULL data pointer,
855+ // to catch this and distiguish it from a NULL BLOB I do a check to see if one field was NULL:
856+ // (old_null_blob != new_null_blob)
857+ if ((old_blob_url != new_blob_url) || (old_null_blob != new_null_blob)) {
858+
859+ result->mr_had_blobs = true;
860
861-
862- // Signal PBMS to record a new reference to the BLOB.
863- // If 'blob' is not a BLOB URL then it will be stored in the repositor as a new BLOB
864- // and a reference to it will be created.
865- err = pbms_api.retainBlob(DB_NAME(field), TAB_NAME(field), blob_url_buffer, blob, org_length, field->field_index, result);
866- if (err)
867- return err;
868-
869- // If the BLOB length changed reset it.
870- // This will happen if the BLOB data was replaced with a BLOB reference.
871- length = strlen(blob_url_buffer) +1;
872- if ((length != org_length) || memcmp(blob_url_buffer, blob, length)) {
873- if (length != org_length) {
874- field->store_length((uchar *)blob_rec, packlength, length);
875- }
876-
877- if (length > org_length) {
878- // This can only happen if the BLOB URL is actually larger than the BLOB itself.
879- blob = (char *) session_alloc(current_session, length);
880- memcpy(blob_rec+packlength, &blob, sizeof(char*));
881- }
882- memcpy(blob, blob_url_buffer, length);
883+ // The BLOB was updated so delete the old one and insert the new one.
884+ if ((old_null_blob == false) && (err = pbms_api.releaseBlob(DB_NAME(field), TAB_NAME(field), old_blob_url, old_length, result)))
885+ return err;
886+
887+ if ((new_null_blob == false) && (err = insertRecord(field, new_blob_url, new_length, new_blob_rec, packlength, result)))
888+ return err;
889 }
890 }
891
892@@ -136,33 +228,100 @@
893 }
894
895 //====================
896-int pbms_delete_row_blobs(TABLE *table, const uchar *row_buffer, PBMSResultPtr result)
897-{
898- Field_blob *field;
899- const char *blob_rec;
900+int pbms_write_row_blobs(const TABLE *table, unsigned char *row_buffer, PBMSResultPtr result)
901+{
902+
903+ Field_blob *field;
904+ unsigned char *blob_rec;
905+ char *blob_url;
906+ size_t packlength, i, length;
907+ int err;
908+
909+ result->mr_had_blobs = false;
910+
911+ if (!pbms_api.isPBMSLoaded())
912+ return 0;
913+
914+ if (table->s->blob_fields == 0)
915+ return 0;
916+
917+ for (i= 0; i < table->s->blob_fields; i++) {
918+ field = GET_BLOB_FIELD(table, i);
919+
920+ if (field->is_null_in_record(row_buffer))
921+ continue;
922+
923+ {
924+ String type_name;
925+ // Note: field->type() always returns MYSQL_TYPE_BLOB regardless of the type of BLOB
926+ field->sql_type(type_name);
927+ if (strcasecmp(type_name.c_ptr(), "LongBlob"))
928+ continue;
929+ }
930+
931+ if( is_pbms_blob && !is_pbms_blob(field) )
932+ continue;
933+
934+ result->mr_had_blobs = true;
935+
936+ // Get the blob record:
937+ packlength = field->pack_length() - field->table->s->blob_ptr_size;
938+ blob_rec = row_buffer + field->offset(field->table->record[0]);
939+
940+ length = field->get_length(blob_rec);
941+ memcpy(&blob_url, blob_rec +packlength, sizeof(char*));
942+
943+ if ((err = insertRecord(field, blob_url, length, blob_rec, packlength, result)))
944+ return err;
945+ }
946+
947+ return 0;
948+}
949+
950+//====================
951+int pbms_delete_row_blobs(const TABLE *table, const unsigned char *row_buffer, PBMSResultPtr result)
952+{
953+ Field_blob *field;
954+ const unsigned char *blob_rec;
955 char *blob;
956 size_t packlength, i, length;
957+ bool call_failed = false;
958 int err;
959- String type_name;
960+
961+ result->mr_had_blobs = false;
962
963+ if (!pbms_api.isPBMSLoaded())
964+ return 0;
965+
966 if (table->s->blob_fields == 0)
967 return 0;
968
969 for (i= 0; i < table->s->blob_fields; i++) {
970 field = GET_BLOB_FIELD(table, i);
971+
972+ if (field->is_null_in_record(row_buffer))
973+ continue;
974+
975+ {
976+ String type_name;
977+ // Note: field->type() always returns MYSQL_TYPE_BLOB regardless of the type of BLOB
978+ field->sql_type(type_name);
979+ if (strcasecmp(type_name.c_ptr(), "LongBlob"))
980+ continue;
981+ }
982+
983+ if(is_pbms_blob && !is_pbms_blob(field) )
984+ continue;
985
986- // Note: field->type() always returns MYSQL_TYPE_BLOB regardless of the type of BLOB
987- field->sql_type(type_name);
988- if (strcasecmp(type_name.c_ptr(), "LongBlob"))
989- continue;
990-
991+ result->mr_had_blobs = true;
992+
993 // Get the blob record:
994- blob_rec = (char *)row_buffer + field->offset(field->table->record[0]);
995 packlength = field->pack_length() - field->table->s->blob_ptr_size;
996
997- length = field->get_length((uchar *)blob_rec);
998+ blob_rec = row_buffer + field->offset(field->table->record[0]);
999+ length = field->get_length(blob_rec);
1000 memcpy(&blob, blob_rec +packlength, sizeof(char*));
1001-
1002+
1003 // Signal PBMS to delete the reference to the BLOB.
1004 err = pbms_api.releaseBlob(DB_NAME(field), TAB_NAME(field), blob, length, result);
1005 if (err)
1006@@ -208,18 +367,16 @@
1007 {
1008 char o_db_name[MAX_NAME_SIZE], n_db_name[MAX_NAME_SIZE], o_tab_name[MAX_NAME_SIZE], n_tab_name[MAX_NAME_SIZE];
1009
1010+ result->mr_had_blobs = false;
1011+ if (!pbms_api.isPBMSLoaded())
1012+ return 0;
1013+
1014+ result->mr_had_blobs = true; // Assume it has blobs.
1015+
1016 parse_table_path(old_table_path, o_db_name, o_tab_name);
1017 parse_table_path(new_table_path, n_db_name, n_tab_name);
1018
1019- if (strcmp(o_db_name, n_db_name)) {
1020- result->mr_code = MS_ERR_INVALID_OPERATION;
1021- strcpy(result->mr_message, "PBMS does not support renaming tables across databases.");
1022- strcpy(result->mr_stack, "pbms_rename_table_with_blobs()");
1023- return MS_ERR_INVALID_OPERATION;
1024- }
1025-
1026-
1027- return pbms_api.renameTable(o_db_name, o_tab_name, n_tab_name, result);
1028+ return pbms_api.renameTable(o_db_name, o_tab_name, n_db_name, n_tab_name, result);
1029 }
1030
1031 //====================
1032@@ -227,23 +384,26 @@
1033 {
1034 char db_name[MAX_NAME_SIZE], tab_name[MAX_NAME_SIZE];
1035
1036+ result->mr_had_blobs = false;
1037+ if (!pbms_api.isPBMSLoaded())
1038+ return 0;
1039+
1040+ result->mr_had_blobs = true; // Assume it has blobs.
1041 parse_table_path(table_path, db_name, tab_name);
1042
1043 return pbms_api.dropTable(db_name, tab_name, result);
1044 }
1045
1046 //====================
1047-void pbms_completed(TABLE *table, bool ok)
1048+void pbms_completed(const TABLE *table, bool ok)
1049 {
1050+ if (!pbms_api.isPBMSLoaded())
1051+ return;
1052+
1053 if ((!table) || (table->s->blob_fields != 0))
1054 pbms_api.completed(ok) ;
1055
1056 return ;
1057 }
1058-
1059-#elif defined(__WIN__)
1060-
1061-// Remove linker warning 4221 about empty file
1062-namespace { char dummy; };
1063-
1064-#endif // PBMS_ENABLED
1065+#endif
1066+#endif // DRIZZLED
1067
1068=== modified file 'src/pbms_enabled.h'
1069--- src/pbms_enabled.h 2009-09-16 06:39:30 +0000
1070+++ src/pbms_enabled.h 2010-07-21 22:08:48 +0000
1071@@ -22,7 +22,7 @@
1072 *
1073 * H&G2JCtL
1074 *
1075- * PBMS interface used to enable engines for use with the PBMS engine.
1076+ * PBMS interface used to enable engines for use with the PBMS daemon.
1077 *
1078 * For an example on how to build this into an engine have a look at the PBXT engine
1079 * in file ha_pbxt.cc. Search for 'PBMS_ENABLED'.
1080@@ -35,6 +35,17 @@
1081
1082 #include "pbms.h"
1083
1084+#ifdef DRIZZLED
1085+#include <drizzled/common.h>
1086+#define TABLE Table
1087+#define uchar unsigned char
1088+#else
1089+#include <mysql_priv.h>
1090+#endif
1091+
1092+class Field;
1093+typedef bool (*IsPBMSFilterFunc)(Field *field);
1094+
1095 /*
1096 * pbms_initialize() should be called from the engines plugIn's 'init()' function.
1097 * The engine_name is the name of your engine, "PBXT" or "InnoDB" for example.
1098@@ -43,31 +54,38 @@
1099 * true if this is being built into the server's handler code above the engine level
1100 * calls.
1101 */
1102-extern bool pbms_initialize(const char *engine_name, bool isServer, PBMSResultPtr result);
1103+extern bool pbms_initialize(const char *engine_name, bool isServer, bool isTransactional, PBMSResultPtr result, IsPBMSFilterFunc is_pbms_blob);
1104
1105 /*
1106 * pbms_finalize() should be called from the engines plugIn's 'deinit()' function.
1107 */
1108-extern void pbms_finalize();
1109+extern void pbms_finalize(const char *engine_name);
1110
1111 /*
1112 * pbms_write_row_blobs() should be called from the engine's 'write_row' function.
1113 * It can alter the row data so it must be called before any other function using the row data.
1114- * It should also be called from engine's 'update_row' function for the new row.
1115- *
1116- * pbms_completed() must be called after calling pbms_write_row_blobs() and just before
1117- * returning from write_row() to indicate if the operation completed successfully.
1118- */
1119-extern int pbms_write_row_blobs(TABLE *table, uchar *buf, PBMSResultPtr result);
1120+ *
1121+ * pbms_completed() must be called after calling pbms_write_row_blobs() and just before
1122+ * returning from write_row() to indicate if the operation completed successfully.
1123+ */
1124+extern int pbms_write_row_blobs(const TABLE *table, unsigned char *buf, PBMSResultPtr result);
1125+
1126+/*
1127+ * pbms_update_row_blobs() should be called from the engine's 'update_row' function.
1128+ * It can alter the row data so it must be called before any other function using the row data.
1129+ *
1130+ * pbms_completed() must be called after calling pbms_write_row_blobs() and just before
1131+ * returning from write_row() to indicate if the operation completed successfully.
1132+ */
1133+extern int pbms_update_row_blobs(const TABLE *table, const unsigned char *old_row, unsigned char *new_row, PBMSResultPtr result);
1134
1135 /*
1136 * pbms_delete_row_blobs() should be called from the engine's 'delete_row' function.
1137- * It should also be called from engine's 'update_row' function for the old row.
1138 *
1139 * pbms_completed() must be called after calling pbms_delete_row_blobs() and just before
1140 * returning from delete_row() to indicate if the operation completed successfully.
1141 */
1142-extern int pbms_delete_row_blobs(TABLE *table, const uchar *buf, PBMSResultPtr result);
1143+extern int pbms_delete_row_blobs(const TABLE *table, const unsigned char *buf, PBMSResultPtr result);
1144
1145 /*
1146 * pbms_rename_table_with_blobs() should be called from the engine's 'rename_table' function.
1147@@ -98,6 +116,6 @@
1148 * pbms_completed() has the effect of committing or rolling back the changes made if the session
1149 * is in 'autocommit' mode.
1150 */
1151-extern void pbms_completed(TABLE *table, bool ok);
1152+extern void pbms_completed(const TABLE *table, bool ok);
1153
1154 #endif

Subscribers

People subscribed via source and target branches