Merge lp:~barry-leslie/drizzle/drizzle_pbmsV2 into lp:~drizzle-trunk/drizzle/development

Proposed by Barry Leslie
Status: Work in progress
Proposed branch: lp:~barry-leslie/drizzle/drizzle_pbmsV2
Merge into: lp:~drizzle-trunk/drizzle/development
Diff against target: 42221 lines (+16542/-20847)
128 files modified
plugin/pbms/plugin.am (+0/-12)
plugin/pbms/plugin.ini (+121/-112)
plugin/pbms/src/TransTest.cc (+0/-953)
plugin/pbms/src/alias_ms.cc (+0/-1092)
plugin/pbms/src/alias_ms.h (+0/-350)
plugin/pbms/src/api_ms.cc (+0/-270)
plugin/pbms/src/backup_ms.cc (+0/-733)
plugin/pbms/src/backup_ms.h (+0/-186)
plugin/pbms/src/cloud/cloud_ms.cc (+162/-271)
plugin/pbms/src/cloud/cloud_ms.h (+61/-139)
plugin/pbms/src/compactor_ms.cc (+0/-343)
plugin/pbms/src/compactor_ms.h (+0/-64)
plugin/pbms/src/cslib/CSConfig.h (+10/-10)
plugin/pbms/src/cslib/CSDefs.h (+2/-1)
plugin/pbms/src/cslib/CSEncode.h (+1/-1)
plugin/pbms/src/cslib/CSException.h (+1/-0)
plugin/pbms/src/cslib/CSFile.cc (+128/-0)
plugin/pbms/src/cslib/CSFile.h (+82/-1)
plugin/pbms/src/cslib/CSGlobal.h (+1/-0)
plugin/pbms/src/cslib/CSHTTPStream.h (+1/-0)
plugin/pbms/src/cslib/CSIdSpace.cc (+366/-0)
plugin/pbms/src/cslib/CSIdSpace.h (+89/-0)
plugin/pbms/src/cslib/CSLog.cc (+2/-5)
plugin/pbms/src/cslib/CSMemory.cc (+3/-0)
plugin/pbms/src/cslib/CSMutex.cc (+1/-2)
plugin/pbms/src/cslib/CSObject.h (+3/-1)
plugin/pbms/src/cslib/CSPath.cc (+24/-5)
plugin/pbms/src/cslib/CSPath.h (+2/-0)
plugin/pbms/src/cslib/CSS3Protocol.cc (+22/-11)
plugin/pbms/src/cslib/CSSocket.cc (+44/-4)
plugin/pbms/src/cslib/CSSocket.h (+12/-2)
plugin/pbms/src/cslib/CSStorage.cc (+30/-6)
plugin/pbms/src/cslib/CSStorage.h (+8/-3)
plugin/pbms/src/cslib/CSStrUtil.cc (+0/-1)
plugin/pbms/src/cslib/CSStrUtil.h (+0/-2)
plugin/pbms/src/cslib/CSStream.cc (+66/-0)
plugin/pbms/src/cslib/CSStream.h (+37/-3)
plugin/pbms/src/cslib/CSString.cc (+1/-3)
plugin/pbms/src/cslib/CSString.h (+3/-0)
plugin/pbms/src/cslib/CSSys.h (+2/-0)
plugin/pbms/src/cslib/CSSys_unix.cc (+11/-0)
plugin/pbms/src/cslib/CSSys_win.cc (+7/-0)
plugin/pbms/src/cslib/CSThread.cc (+5/-10)
plugin/pbms/src/cslib/CSThread.h (+6/-0)
plugin/pbms/src/daemon/pbmsdaemon_ms.cc (+1084/-19)
plugin/pbms/src/daemon/pbmsdaemon_ms.h (+133/-11)
plugin/pbms/src/database/database_ms.cc (+630/-1532)
plugin/pbms/src/database/database_ms.h (+101/-182)
plugin/pbms/src/discover_ms.cc (+0/-1390)
plugin/pbms/src/discover_ms.h (+0/-96)
plugin/pbms/src/includes/defs_ms.h (+40/-14)
plugin/pbms/src/includes/pbms.h (+32/-708)
plugin/pbms/src/includes/pbms_config.h (+57/-0)
plugin/pbms/src/includes/version_ms.h (+1/-1)
plugin/pbms/src/lib/pbmslib.cc (+1844/-0)
plugin/pbms/src/lib/pbmslib.h (+99/-85)
plugin/pbms/src/mysql_ms.cc (+0/-104)
plugin/pbms/src/mysql_ms.h (+0/-40)
plugin/pbms/src/network/connection_handler_ms.cc (+289/-168)
plugin/pbms/src/network/connection_handler_ms.h (+9/-3)
plugin/pbms/src/network/network_ms.cc (+7/-8)
plugin/pbms/src/network/network_ms.h (+1/-1)
plugin/pbms/src/open_table_ms.cc (+0/-1121)
plugin/pbms/src/open_table_ms.h (+0/-213)
plugin/pbms/src/pbms_enabled.cc (+0/-421)
plugin/pbms/src/pbms_enabled.h (+0/-133)
plugin/pbms/src/plugin/drizzle/events_ms.cc (+15/-14)
plugin/pbms/src/plugin/drizzle/events_ms.h (+7/-2)
plugin/pbms/src/plugin/engine_ms.cc (+291/-699)
plugin/pbms/src/plugin/engine_ms.h (+8/-47)
plugin/pbms/src/plugin/ha_pbms.cc (+242/-373)
plugin/pbms/src/plugin/ha_pbms.h (+18/-14)
plugin/pbms/src/plugin/parameters_ms.cc (+90/-101)
plugin/pbms/src/plugin/parameters_ms.h (+5/-6)
plugin/pbms/src/plugin/plugin_ms.cc (+47/-19)
plugin/pbms/src/repository/data/blob_repository_ms.cc (+1203/-0)
plugin/pbms/src/repository/data/blob_repository_ms.h (+162/-0)
plugin/pbms/src/repository/data/repository_file_ms.cc (+790/-0)
plugin/pbms/src/repository/data/repository_file_ms.h (+108/-0)
plugin/pbms/src/repository/data/repository_ms.cc (+535/-1606)
plugin/pbms/src/repository/data/repository_ms.h (+136/-231)
plugin/pbms/src/repository/index/repo_index_ms.cc (+862/-0)
plugin/pbms/src/repository/index/repo_index_ms.h (+206/-0)
plugin/pbms/src/repository/logs/temp_log_ms.cc (+399/-331)
plugin/pbms/src/repository/logs/temp_log_ms.h (+49/-81)
plugin/pbms/src/repository/references/blob_refs.cc (+298/-0)
plugin/pbms/src/repository/references/blob_refs.h (+100/-0)
plugin/pbms/src/repository/references/bloburl.cc (+328/-0)
plugin/pbms/src/repository/references/bloburl.h (+217/-0)
plugin/pbms/src/repository/references/metadata_ms.cc (+296/-1)
plugin/pbms/src/repository/references/metadata_ms.h (+60/-83)
plugin/pbms/src/repository/references/table_ms.cc (+564/-339)
plugin/pbms/src/repository/references/table_ms.h (+60/-82)
plugin/pbms/src/systab_backup_ms.cc (+0/-689)
plugin/pbms/src/systab_backup_ms.h (+0/-78)
plugin/pbms/src/systab_dump_ms.cc (+0/-586)
plugin/pbms/src/systab_dump_ms.h (+0/-63)
plugin/pbms/src/systab_enabled_ms.cc (+0/-192)
plugin/pbms/src/systab_enabled_ms.h (+0/-53)
plugin/pbms/src/systab_variable_ms.cc (+0/-791)
plugin/pbms/src/systab_variable_ms.h (+0/-73)
plugin/pbms/src/system_tables/pbms_blob.cc (+134/-0)
plugin/pbms/src/system_tables/pbms_blob.h (+61/-0)
plugin/pbms/src/system_tables/pbms_cloud.cc (+233/-251)
plugin/pbms/src/system_tables/pbms_cloud.h (+12/-12)
plugin/pbms/src/system_tables/pbms_metadata.cc (+470/-0)
plugin/pbms/src/system_tables/pbms_metadata.h (+83/-0)
plugin/pbms/src/system_tables/pbms_metadata_header.cc (+104/-87)
plugin/pbms/src/system_tables/pbms_metadata_header.h (+15/-15)
plugin/pbms/src/system_tables/pbms_reference.cc (+387/-0)
plugin/pbms/src/system_tables/pbms_reference.h (+75/-0)
plugin/pbms/src/system_tables/pbms_repository.cc (+506/-0)
plugin/pbms/src/system_tables/pbms_repository.h (+85/-0)
plugin/pbms/src/system_tables/pbms_server.cc (+468/-0)
plugin/pbms/src/system_tables/pbms_server.h (+115/-0)
plugin/pbms/src/system_tables/systab_util_ms.cc (+98/-12)
plugin/pbms/src/system_tables/systab_util_ms.h (+72/-6)
plugin/pbms/src/system_tables/system_table_ms.cc (+270/-2136)
plugin/pbms/src/system_tables/system_table_ms.h (+124/-251)
plugin/pbms/src/transactions/trans_cache_ms.cc (+1/-3)
plugin/pbms/src/transactions/trans_log_ms.cc (+9/-14)
plugin/pbms/src/transactions/trans_log_ms.h (+2/-18)
plugin/pbms/src/transactions/transaction_ms.cc (+125/-26)
plugin/pbms/src/transactions/transaction_ms.h (+4/-9)
plugin/pbms/src/udf_ms.cc (+0/-549)
plugin/pbms/tests/r/basic.result (+163/-19)
plugin/pbms/tests/t/basic.test (+187/-37)
po/POTFILES.in (+2/-2)
To merge this branch: bzr merge lp:~barry-leslie/drizzle/drizzle_pbmsV2
Reviewer Review Type Date Requested Status
Monty Taylor Needs Fixing
Review via email: mp+56493@code.launchpad.net

Description of the change

This is the new version of PBMS which is pretty much a rewrite of the old version. This version fixes some limitations in the old version and is designed to work with replication.

It is NOT backward compatible with the old version. If this is a problem for anyone they can contact me and I will help them porting from the old version to the new one.

To post a comment you must log in.
Revision history for this message
Monty Taylor (mordred) wrote :
review: Needs Fixing
Revision history for this message
Barry Leslie (barry-leslie) wrote :

This is a test case problem, it is assuming the tests are being run in a new
database.

I will see if I can change this so that it will give the expected results
when run twice.

On 4/7/11 9:47 AM, "Monty Taylor" <email address hidden> wrote:

> Review: Needs Fixing
> Hey Barry,
>
> This branch fails the "run tests twice" job:
>
> http://jenkins.drizzle.org/view/Drizzle-build/job/drizzle-build-repeat-tests-t
> wice/387/console
>

-------------------------------------------------------------------------
Barry Leslie

Revision history for this message
Barry Leslie (barry-leslie) wrote :

"run tests twice" now works.

Revision history for this message
Barry Leslie (barry-leslie) wrote :

I am continuing to work on this branch and will let people know when I think it is ready to be merged.

1979. By Barry Leslie

Merged with drizzle trunk

1980. By Barry Leslie

Use correct charset.h

1981. By Barry Leslie

Merged changes from the main pbms project.

Unmerged revisions

1981. By Barry Leslie

Merged changes from the main pbms project.

1980. By Barry Leslie

Use correct charset.h

1979. By Barry Leslie

Merged with drizzle trunk

1978. By Barry Leslie

Updated test case so it can be run multiple times.

1977. By Barry Leslie

Fixed fo renamed pbms files in POTFILES.in

1976. By Barry Leslie

fixed un initialized variable warning.

1975. By Barry Leslie

Code cleanup

1974. By Barry Leslie

Merged in lp:drizzle.

1973. By Barry Leslie

Added warning message to expected test out put until bug 746720 is fixed.

1972. By Barry Leslie

Removed an invalid test from the test cases.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== removed file 'plugin/pbms/plugin.am'
--- plugin/pbms/plugin.am 2010-07-14 22:58:28 +0000
+++ plugin/pbms/plugin.am 1970-01-01 00:00:00 +0000
@@ -1,12 +0,0 @@
1
2EXTRA_DIST+= \
3 plugin/pbms/src/TransTest.cc \
4 plugin/pbms/src/alias_ms.cc \
5 plugin/pbms/src/alias_ms.h \
6 plugin/pbms/src/discover_ms.cc \
7 plugin/pbms/src/pbms_enabled.cc \
8 plugin/pbms/src/pbms_enabled.h \
9 plugin/pbms/src/systab_enabled_ms.cc \
10 plugin/pbms/src/systab_enabled_ms.h \
11 plugin/pbms/src/udf_ms.cc
12
130
=== modified file 'plugin/pbms/plugin.ini'
--- plugin/pbms/plugin.ini 2010-12-18 04:43:40 +0000
+++ plugin/pbms/plugin.ini 2011-06-01 23:57:43 +0000
@@ -19,121 +19,132 @@
19title=PrimeBase Media Stream Daemon19title=PrimeBase Media Stream Daemon
20description=Provides BLOB streaming service for storage engines20description=Provides BLOB streaming service for storage engines
21load_by_default=no21load_by_default=no
22cxxflags=-DDRIZZLED -DPBMS_VERSION=0.5.14-beta22cxxflags= -DDRIZZLED -DPBMS_VERSION=0.6.01-beta -I$(top_srcdir)/plugin/pbms/src
23build_conditional="${ac_cv_libcurl}" = "yes"23build_conditional="${ac_cv_libcurl}" = "yes"
24ldflags= ${LTLIBCURL}24ldflags= ${LTLIBCURL}
2525
26# disabled=yes26# disabled=yes
2727
28sources=src/plugin_ms.cc28sources=src/plugin/plugin_ms.cc
29 src/backup_ms.cc29 src/cloud/cloud_ms.cc
30 src/cloud_ms.cc30 src/daemon/pbmsdaemon_ms.cc
31 src/compactor_ms.cc31 src/database/database_ms.cc
32 src/connection_handler_ms.cc32 src/network/connection_handler_ms.cc
33 src/database_ms.cc33 src/network/network_ms.cc
34 src/engine_ms.cc34 src/plugin/drizzle/events_ms.cc
35 src/events_ms.cc35 src/plugin/engine_ms.cc
36 src/mysql_ms.cc36 src/plugin/ha_pbms.cc
37 src/network_ms.cc37 src/plugin/parameters_ms.cc
38 src/open_table_ms.cc38 src/repository/data/blob_repository_ms.cc
39 src/parameters_ms.cc39 src/repository/data/repository_file_ms.cc
40 src/pbmsdaemon_ms.cc40 src/repository/data/repository_ms.cc
41 src/ha_pbms.cc41 src/repository/index/repo_index_ms.cc
42 src/repository_ms.cc42 src/repository/logs/temp_log_ms.cc
43 src/systab_backup_ms.cc43 src/repository/references/blob_refs.cc
44 src/systab_cloud_ms.cc44 src/repository/references/bloburl.cc
45 src/systab_dump_ms.cc45 src/repository/references/metadata_ms.cc
46 src/systab_httpheader_ms.cc46 src/repository/references/table_ms.cc
47 src/systab_util_ms.cc47 src/system_tables/pbms_blob.cc
48 src/systab_variable_ms.cc48 src/system_tables/pbms_cloud.cc
49 src/system_table_ms.cc49 src/system_tables/pbms_metadata.cc
50 src/table_ms.cc50 src/system_tables/pbms_metadata_header.cc
51 src/temp_log_ms.cc51 src/system_tables/pbms_reference.cc
52 src/transaction_ms.cc52 src/system_tables/pbms_repository.cc
53 src/trans_cache_ms.cc53 src/system_tables/pbms_server.cc
54 src/trans_log_ms.cc54 src/system_tables/systab_util_ms.cc
55 src/cslib/CSDirectory.cc55 src/system_tables/system_table_ms.cc
56 src/cslib/CSEncode.cc56 src/transactions/trans_cache_ms.cc
57 src/cslib/CSException.cc57 src/transactions/trans_log_ms.cc
58 src/cslib/CSFile.cc58 src/transactions/transaction_ms.cc
59 src/cslib/CSHTTPStream.cc59 src/cslib/CSDirectory.cc
60 src/cslib/CSLog.cc60 src/cslib/CSEncode.cc
61 src/cslib/CSMd5.cc61 src/cslib/CSException.cc
62 src/cslib/CSMemory.cc62 src/cslib/CSFile.cc
63 src/cslib/CSMutex.cc63 src/cslib/CSHTTPStream.cc
64 src/cslib/CSObject.cc64 src/cslib/CSLog.cc
65 src/cslib/CSPath.cc65 src/cslib/CSMd5.cc
66 src/cslib/CSS3Protocol.cc66 src/cslib/CSMemory.cc
67 src/cslib/CSSha1.cc67 src/cslib/CSMutex.cc
68 src/cslib/CSSocket.cc68 src/cslib/CSObject.cc
69 src/cslib/CSStorage.cc69 src/cslib/CSPath.cc
70 src/cslib/CSStream.cc70 src/cslib/CSS3Protocol.cc
71 src/cslib/CSString.cc71 src/cslib/CSSha1.cc
72 src/cslib/CSStrUtil.cc72 src/cslib/CSSocket.cc
73 src/cslib/CSThread.cc73 src/cslib/CSStorage.cc
74 src/cslib/CSTime.cc74 src/cslib/CSStream.cc
75 src/cslib/CSUTF8.cc75 src/cslib/CSString.cc
76 src/cslib/CSXML.cc76 src/cslib/CSStrUtil.cc
77 src/cslib/CSSys_unix.cc77 src/cslib/CSThread.cc
7878 src/cslib/CSTime.cc
79headers=src/backup_ms.h79 src/cslib/CSUTF8.cc
80 src/cloud_ms.h80 src/cslib/CSXML.cc
81 src/compactor_ms.h81 src/cslib/CSSys_unix.cc
82 src/connection_handler_ms.h82 src/cslib/CSIdSpace.cc
83 src/cslib/CSConfig.h83
84 src/cslib/CSDefs.h84headers=src/cloud/cloud_ms.h
85 src/cslib/CSDirectory.h85 src/cslib/CSConfig.h
86 src/cslib/CSEncode.h86 src/cslib/CSDefs.h
87 src/cslib/CSException.h87 src/cslib/CSDirectory.h
88 src/cslib/CSFile.h88 src/cslib/CSEncode.h
89 src/cslib/CSGlobal.h89 src/cslib/CSException.h
90 src/cslib/CSHTTPStream.h90 src/cslib/CSFile.h
91 src/cslib/CSLog.h91 src/cslib/CSGlobal.h
92 src/cslib/CSMd5.h92 src/cslib/CSHTTPStream.h
93 src/cslib/CSMemory.h93 src/cslib/CSIdSpace.h
94 src/cslib/CSMutex.h94 src/cslib/CSLog.h
95 src/cslib/CSObject.h95 src/cslib/CSMd5.h
96 src/cslib/CSPath.h96 src/cslib/CSMemory.h
97 src/cslib/CSS3Protocol.h97 src/cslib/CSMutex.h
98 src/cslib/CSSha1.h98 src/cslib/CSObject.h
99 src/cslib/CSSocket.h99 src/cslib/CSPath.h
100 src/cslib/CSStorage.h100 src/cslib/CSS3Protocol.h
101 src/cslib/CSStream.h101 src/cslib/CSSha1.h
102 src/cslib/CSString.h102 src/cslib/CSSocket.h
103 src/cslib/CSStrUtil.h103 src/cslib/CSStorage.h
104 src/cslib/CSThread.h104 src/cslib/CSStream.h
105 src/cslib/CSTime.h105 src/cslib/CSString.h
106 src/cslib/CSUTF8.h106 src/cslib/CSStrUtil.h
107 src/cslib/CSXML.h107 src/cslib/CSSys.h
108 src/cslib/CSSys.h108 src/cslib/CSThread.h
109 src/database_ms.h109 src/cslib/CSTime.h
110 src/defs_ms.h110 src/cslib/CSUTF8.h
111 src/discover_ms.h111 src/cslib/CSXML.h
112 src/engine_ms.h112 src/daemon/pbmsdaemon_ms.h
113 src/events_ms.h113 src/database/database_ms.h
114 src/ha_pbms.h114 src/includes/defs_ms.h
115 src/metadata_ms.h115 src/includes/pbms_config.h
116 src/mysql_ms.h116 src/includes/pbms.h
117 src/network_ms.h117 src/includes/version_ms.h
118 src/open_table_ms.h118 src/lib/pbmslib.h
119 src/parameters_ms.h119 src/network/connection_handler_ms.h
120 src/pbmsdaemon_ms.h120 src/network/network_ms.h
121 src/pbms.h121 src/plugin/drizzle/events_ms.h
122 src/pbmslib.h122 src/plugin/engine_ms.h
123 src/repository_ms.h123 src/plugin/ha_pbms.h
124 src/systab_backup_ms.h124 src/plugin/parameters_ms.h
125 src/systab_cloud_ms.h125 src/repository/data/blob_repository_ms.h
126 src/systab_dump_ms.h126 src/repository/data/repository_file_ms.h
127 src/systab_httpheader_ms.h127 src/repository/data/repository_ms.h
128 src/systab_util_ms.h128 src/repository/index/repo_index_ms.h
129 src/systab_variable_ms.h129 src/repository/logs/temp_log_ms.h
130 src/system_table_ms.h130 src/repository/references/blob_refs.h
131 src/table_ms.h131 src/repository/references/bloburl.h
132 src/temp_log_ms.h132 src/repository/references/metadata_ms.h
133 src/transaction_ms.h133 src/repository/references/table_ms.h
134 src/trans_cache_ms.h134 src/system_tables/pbms_blob.h
135 src/trans_log_ms.h135 src/system_tables/pbms_cloud.h
136 src/version_ms.h136 src/system_tables/pbms_metadata.h
137 src/system_tables/pbms_metadata_header.h
138 src/system_tables/pbms_reference.h
139 src/system_tables/pbms_repository.h
140 src/system_tables/pbms_server.h
141 src/system_tables/systab_util_ms.h
142 src/system_tables/system_table_ms.h
143 src/transactions/trans_cache_ms.h
144 src/transactions/trans_log_ms.h
145 src/transactions/transaction_ms.h
146
147
137 148
138 149
139extra_dist=AUTHORS150extra_dist=AUTHORS
@@ -141,6 +152,4 @@
141 ChangeLog152 ChangeLog
142 README153 README
143 TODO154 TODO
144 src/api_ms.cc
145 src/metadata_ms.cc
146 src/cslib/CSSys_win.cc155 src/cslib/CSSys_win.cc
147156
=== removed file 'plugin/pbms/src/TransTest.cc'
--- plugin/pbms/src/TransTest.cc 2010-12-18 04:43:40 +0000
+++ plugin/pbms/src/TransTest.cc 1970-01-01 00:00:00 +0000
@@ -1,953 +0,0 @@
1/* Copyright (C) 2009 PrimeBase Technologies GmbH, Germany
2 *
3 * PrimeBase Media Stream for MySQL
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Barry Leslie
20 *
21 * 2009-06-17
22 *
23 * H&G2JCtL
24 *
25 * PBMS transaction handling test driver.
26 *
27 * This is a test driver for the PBMS transaction log. It uses 2 tables in a database and
28 * inserts transaction records into 1 while writing them to the transaction log. The transaction
29 * log reader thread reads the transactions from the log and writes them to the second table.
30 * After a recovery the 2 tables should be identical.
31 *
32 * Built in crash points can be triggered to test that the recovery works correctly.
33 *
34 */
35
36#ifdef UNIT_TEST
37
38#include <stdlib.h>
39#include <stdio.h>
40#include <unistd.h>
41#include <string.h>
42#include <ctype.h>
43#include <inttypes.h>
44
45#include "cslib/CSConfig.h"
46#include "cslib/CSGlobal.h"
47#include "cslib/CSThread.h"
48#include "cslib/CSStrUtil.h"
49#include "cslib/CSStorage.h"
50
51#include "trans_cache_ms.h"
52#include "trans_log_ms.h"
53
54#include "mysql.h"
55
56#define CREATE_TABLE_BODY "\
57 (\
58 blob_ref INT NOT NULL AUTO_INCREMENT,\
59 tab_id INT NOT NULL,\
60 blob_id BIGINT NOT NULL, \
61 committed BOOLEAN NOT NULL DEFAULT 0, \
62 PRIMARY KEY (blob_ref, tab_id)\
63)\
64ENGINE = INNODB\
65"
66#ifdef LOG_TABLE
67#undef LOG_TABLE
68#endif
69
70#define LOG_TABLE "translog"
71#define REF_TABLE "transref_%d"
72#define MAX_THREADS 20
73
74#define A_DB_ID 123
75
76#define TEST_DATABASE_NAME "TransTest"
77static const char *user_name = "root";
78static const char *user_passwd = "";
79static int port = 3306;
80static const char *host = "localhost";
81static int nap_time = 1000;
82static int max_transaction = 10; // The maximum number of records generated per transaction
83static bool dump_log = false, overflow_crash = false;
84static int crash_site = 0; // The location to crash at.
85static int num_threads = 1; // The number of writer threads.
86//static int rate = 1000; // The maximum transactions per second to allow.
87static time_t timeout = 60; // How long to run for before crashing or shutting down.
88static bool revover_only = false;
89static bool recreate = false;
90
91static uint32_t cache_size = 0, log_size = 0;
92
93static MSTrans *trans_log;
94
95static CSThreadList *thread_list;
96
97static MYSQL *new_connection(bool check_for_db);
98
99static CSThread *main_thread;
100
101//------------------------------------------------
102class TransTestThread : public CSDaemon {
103public:
104 TransTestThread():
105 CSDaemon(thread_list),
106 count(0),
107 myActivity(0),
108 log(NULL),
109 stopit(false),
110 finished(false),
111 mysql(NULL)
112 {}
113
114 ~TransTestThread()
115 {
116 if (log)
117 log->release();
118
119 if (mysql)
120 mysql_close(mysql);
121 }
122
123 MSTrans *log;
124 MYSQL *mysql;
125 uint32_t count;
126 uint32_t myActivity;
127
128 bool stopit;
129 bool finished;
130
131 virtual bool doWork() {return true;}
132};
133
134//------------------------------------------------
135class TransTestWriterThread : public TransTestThread {
136public:
137 TransTestWriterThread():TransTestThread() {}
138
139 uint32_t tab_id;
140 FILE *myLog;
141
142 void generate_records();
143 bool doWork()
144 {
145 generate_records();
146 finished = true;
147 return true;
148 }
149
150 static TransTestWriterThread *newTransTestWriterThread(uint32_t id)
151 {
152 TransTestWriterThread *tt;
153 enter_();
154
155
156 new_(tt, TransTestWriterThread());
157
158 char name[32];
159 sprintf(name, "write_%d.log", id);
160 if (recreate)
161 tt->myLog = fopen(name, "w+");
162 else {
163 tt->myLog = fopen(name, "a+");
164 fprintf(tt->myLog, "====================================================\n");
165 }
166
167 tt->tab_id = id ;
168 tt->mysql = new_connection(false);
169 tt->log = trans_log;
170 trans_log->retain();
171
172 return_(tt);
173 }
174
175
176};
177
178//------------------------------------------------
179class TransTestReaderThread : public TransTestThread {
180public:
181 TransTestReaderThread():TransTestThread(){}
182
183 bool recovering;
184 void processTransactionLog();
185 bool doWork()
186 {
187 processTransactionLog();
188 return true;
189 }
190
191 static TransTestReaderThread *newTransTestReaderThread(MSTrans *log)
192 {
193 TransTestReaderThread *tt;
194 enter_();
195
196 new_(tt, TransTestReaderThread());
197 tt->mysql = new_connection(false);
198 tt->log = log;
199 tt->log->retain();
200
201 tt->log->txn_SetReader(tt); // The reader daemon is passed in unreferenced.
202 tt->recovering = false;
203 return_(tt);
204 }
205
206 bool rec_found(uint64_t id, uint32_t tab_id)
207 {
208 char stmt[100];
209 MYSQL_RES *results = NULL;
210 bool found;
211
212 sprintf(stmt, "SELECT blob_ref FROM "LOG_TABLE" WHERE blob_ref = %"PRIu64" AND tab_id = %"PRIu32"", id, tab_id);
213 if (mysql_query(mysql, stmt)) {
214 printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
215 printf("%s\n", stmt);
216 exit(1);
217 }
218
219
220 results = mysql_store_result(mysql);
221 if (!results){
222 printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
223 exit(1);
224 }
225
226 found = (mysql_num_rows(results) == 1);
227 mysql_free_result(results);
228
229 return found;
230
231 }
232
233
234};
235
236TransTestReaderThread *TransReader;
237//------------------------------------------------
238static void report_mysql_error(MYSQL *mysql, int line, const char *msg)
239{
240 printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), line);
241 if (msg)
242 printf("%s\n", msg);
243 exit(1);
244}
245
246
247//------------------------------------------------
248static MYSQL *new_connection(bool check_for_db)
249{
250 MYSQL *mysql;
251
252 mysql = mysql_init(NULL);
253 if (!mysql) {
254 printf( "mysql_init() failed.\n");
255 exit(1);
256 }
257
258 if (mysql_real_connect(mysql, host, user_name, user_passwd, NULL, port, NULL, 0) == NULL)
259 report_mysql_error(mysql, __LINE__, "mysql_real_connect()");
260
261 if (check_for_db) {
262 MYSQL_RES *results = NULL;
263
264 if (mysql_query(mysql, "show databases like \"" TEST_DATABASE_NAME "\""))
265 report_mysql_error(mysql, __LINE__, "show databases like \"" TEST_DATABASE_NAME "\"");
266
267 results = mysql_store_result(mysql);
268 if (!results)
269 report_mysql_error(mysql, __LINE__, "mysql_store_result()");
270
271
272 if (mysql_num_rows(results) != 1) {
273 if (mysql_query(mysql, "create database " TEST_DATABASE_NAME ))
274 report_mysql_error(mysql, __LINE__, "create database " TEST_DATABASE_NAME );
275 }
276 mysql_free_result(results);
277 }
278
279 if (mysql_query(mysql, "use " TEST_DATABASE_NAME ))
280 report_mysql_error(mysql, __LINE__, "use " TEST_DATABASE_NAME );
281
282 return mysql;
283}
284
285//------------------------------------------------
286static void init_database(MYSQL *mysql, int cnt)
287{
288 char stmt[1024];
289
290 unlink("ms-trans-log.dat");
291 mysql_query(mysql, "drop table if exists " LOG_TABLE ";");
292
293 if (mysql_query(mysql, "create table " LOG_TABLE CREATE_TABLE_BODY ";")){
294 printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
295 exit(1);
296 }
297
298 while (cnt) {
299 sprintf(stmt, "drop table if exists " REF_TABLE ";", cnt);
300 mysql_query(mysql, stmt);
301 sprintf(stmt, "create table " REF_TABLE CREATE_TABLE_BODY ";", cnt);
302 if (mysql_query(mysql, stmt)){
303 printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
304 exit(1);
305 }
306 cnt--;
307 }
308}
309
310
311//------------------------------------------------
312static void display_help(const char *app)
313{
314 printf("\nUsage:\n");
315 printf("%s -help | -r [-t<num_threads>] | -d | [-n] [-sc <cache_size>] [-sl <log_size>] [-c <crash_site>] [-t<num_threads>] [<timeout>]\n\n", app);
316
317 printf("-r: Test recovery after a crash or shutdown.\n");
318 printf("-d: Dump the transaction log.\n");
319 printf("-n: Recreate the tables and recovery log.\n");
320 printf("-c <crash_site>: Crash at this location rather than shutting down. Max = %d\n", MAX_CRASH_POINT+1);
321 printf("-t<num_threads>: The number of writer threads to use, default is %d.\n", num_threads);
322 //printf("-r<rate>: The number af records to be inserted per second, default is %d.\n", rate);
323 printf("<timeout>: The number seconds the test should run before shuttingdown or crashing, default is %d.\n\n", timeout);
324 exit(1);
325}
326
327//---------------------------------
328static void process_args(int argc, const char * argv[])
329{
330 if (argc < 2)
331 return;
332
333 for (int i = 1; i < argc; ) {
334 if ( argv[i][0] != '-') { // Must be timeout
335 timeout = atoi(argv[i]);
336 i++;
337 if ((i != argc) || !timeout)
338 display_help(argv[0]);
339 } else {
340 switch (argv[i][1]) {
341 case 'h':
342 display_help(argv[0]);
343 break;
344
345 case 'r':
346 if (argc > 4 || argv[i][2])
347 display_help(argv[0]);
348 revover_only = true;
349 i++;
350 break;
351
352 case 'd':
353 if (argc != 2 || argv[i][2])
354 display_help(argv[0]);
355 dump_log = true;
356 i++;
357 break;
358
359 case 'n':
360 if (argv[i][2])
361 display_help(argv[0]);
362 recreate = true;
363 i++;
364 break;
365
366 case 'c':
367 if (argv[i][2])
368 display_help(argv[0]);
369 i++;
370 crash_site = atoi(argv[i]);
371 if (crash_site == (MAX_CRASH_POINT + 1))
372 overflow_crash = true;
373 else if ((!crash_site) || (crash_site > MAX_CRASH_POINT))
374 display_help(argv[0]);
375 i++;
376 break;
377
378 case 's': {
379 uint32_t size;
380
381 size = atol(argv[i+1]);
382 if (!size)
383 display_help(argv[0]);
384
385 if (argv[i][2] == 'c')
386 cache_size = size;
387 else if (argv[i][2] == 'l')
388 log_size = size;
389 else
390 display_help(argv[0]);
391
392 i+=2;
393 }
394 break;
395
396 case 't':
397 if (argv[i][2])
398 display_help(argv[0]);
399 i++;
400 num_threads = atoi(argv[i]);
401 if (!num_threads)
402 display_help(argv[0]);
403 i++;
404 break;
405/*
406 case 'r':
407 i++;
408 rate = atoi(argv[i]);
409 if (!rate)
410 display_help(argv[0]);
411 i++;
412 break;
413*/
414 default:
415 display_help(argv[0]);
416 }
417
418 }
419 }
420}
421
422//---------------------------------
423static void init_env()
424{
425 cs_init_memory();
426 CSThread::startUp();
427 if (!(main_thread = CSThread::newCSThread())) {
428 CSException::logOSError(CS_CONTEXT, ENOMEM);
429 exit(1);
430 }
431
432 CSThread::setSelf(main_thread);
433
434 enter_();
435 try_(a) {
436 trans_log = MSTrans::txn_NewMSTrans("./ms-trans-log.dat", /*dump_log*/ true);
437 new_(thread_list, CSThreadList());
438 }
439 catch_(a) {
440 self->logException();
441 CSThread::shutDown();
442 exit(1);
443 }
444 cont_(a);
445
446}
447//---------------------------------
448static void deinit_env()
449{
450 if (thread_list) {
451 thread_list->release();
452 thread_list = NULL;
453 }
454
455 if (trans_log) {
456 trans_log->release();
457 trans_log = NULL;
458 }
459
460 if (main_thread) {
461 main_thread->release();
462 main_thread = NULL;
463 }
464
465 CSThread::shutDown();
466 cs_exit_memory();
467}
468//---------------------------------
469static bool verify_database(MYSQL *mysql)
470{
471 MYSQL_RES **r_results, *l_results = NULL;
472 MYSQL_ROW r_record, l_record;
473 bool ok = false;
474 int i, log_row_cnt, ref_row_cnt = 0, tab_id;
475 char stmt[1024];
476
477 r_results = (MYSQL_RES **) malloc(num_threads * sizeof(MYSQL_RES *));
478
479 if (mysql_query(mysql, "select * from "LOG_TABLE" where committed = 0 order by blob_ref"))
480 report_mysql_error(mysql, __LINE__, "select * from "LOG_TABLE" order by blob_ref");
481
482 l_results = mysql_store_result(mysql);
483 if (!l_results)
484 report_mysql_error(mysql, __LINE__, "mysql_store_result()");
485
486 log_row_cnt = mysql_num_rows(l_results);
487 mysql_free_result(l_results);
488 if (log_row_cnt)
489 printf("Uncommitted references: %d\n", log_row_cnt);
490
491 //---------
492 for (i =0; i < num_threads; i++) {
493 sprintf(stmt, "select * from "REF_TABLE" order by blob_ref", i+1);
494 if (mysql_query(mysql, stmt))
495 report_mysql_error(mysql, __LINE__, stmt);
496
497 r_results[i] = mysql_store_result(mysql);
498 if (!r_results)
499 report_mysql_error(mysql, __LINE__, "mysql_store_result()");
500
501 ref_row_cnt += mysql_num_rows(r_results[i]);
502 }
503 //---------
504 if (mysql_query(mysql, "select * from "LOG_TABLE" order by blob_ref"))
505 report_mysql_error(mysql, __LINE__, "select * from "LOG_TABLE" order by blob_ref");
506
507 l_results = mysql_store_result(mysql);
508 if (!l_results)
509 report_mysql_error(mysql, __LINE__, "mysql_store_result()");
510
511 log_row_cnt = mysql_num_rows(l_results);
512
513 if (log_row_cnt != ref_row_cnt) {
514 if (ref_row_cnt > log_row_cnt) {
515 printf("verify_database() Failed: row count doesn't match: log_row_cnt(%d) != ref_row_cnt(%d)\n", log_row_cnt, ref_row_cnt);
516 goto done;
517 }
518
519 printf("verify_database() Warnning: row count doesn't match: log_row_cnt(%d) != ref_row_cnt(%d)\n", log_row_cnt, ref_row_cnt);
520 printf("Possible unreferenced BLOBs\n");
521 }
522
523 if (log_row_cnt == ref_row_cnt) {
524 for ( i = 0; i < log_row_cnt; i++) {
525 l_record = mysql_fetch_row(l_results);
526 tab_id = atol(l_record[1]);
527 r_record = mysql_fetch_row(r_results[tab_id-1]);
528 if ((atol(l_record[0]) != atol(r_record[0])) ||
529 (atol(l_record[1]) != atol(r_record[1])) ||
530 (atol(l_record[2]) != atol(r_record[2]))) {
531
532 printf("verify_database() Failed: in row %d, tab_id %d\n", i+1, tab_id);
533 printf("field 1: %d =? %d\n", atol(l_record[0]), atol(r_record[0]));
534 printf("field 2: %d =? %d\n", atol(l_record[1]), atol(r_record[1]));
535 printf("field 3: %d =? %d\n", atol(l_record[2]), atol(r_record[2]));
536 goto done;
537 }
538
539 }
540 } else { // The important thing is that there are no BLOBs in the ref tabels that are not in the log table.
541
542 for (i =0; i < num_threads; i++) {
543 mysql_free_result(r_results[i]);
544
545 sprintf(stmt, "select * from "REF_TABLE" where blob_ref not in (select blob_ref from TransTest.translog where tab_id = %d)", i+1, i+1);
546 if (mysql_query(mysql, stmt))
547 report_mysql_error(mysql, __LINE__, stmt);
548
549 r_results[i] = mysql_store_result(mysql);
550 if (!r_results)
551 report_mysql_error(mysql, __LINE__, "mysql_store_result()");
552
553 if (mysql_num_rows(r_results[i])) {
554 printf("verify_database() Failed, Missing BLOBs: %s\n", stmt);
555 goto done;
556 }
557 }
558 }
559
560 printf("verify_database() OK.\n");
561 ok = true;
562
563 done:
564
565 for (i =0; i < num_threads; i++) {
566 mysql_free_result(r_results[i]);
567 }
568 free(r_results);
569
570 mysql_free_result(l_results);
571
572#ifdef DEBUG
573 if (!ok) {
574 trans_log->txn_DumpLog("trace.log");
575 }
576#endif
577 return ok;
578}
579
580//------------------------------------------------
581void TransTestReaderThread::processTransactionLog()
582{
583 MSTransRec rec = {0};
584 MS_TxnState state;
585 char stmt[1024];
586 uint32_t last_tid = 0;
587 enter_();
588
589 // Read in transactions from the log and update
590 // the database table based on them.
591
592 try_(a) {
593 while (!myMustQuit && !stopit) {
594 // This will sleep while waiting for the next
595 // completed transaction.
596 log->txn_GetNextTransaction(&rec, &state);
597 if (myMustQuit)
598 break;
599
600 myActivity++;
601#ifdef CHECK_TIDS
602 if (num_threads == 1) {
603 ASSERT( ((last_tid + 1) == rec.tr_id) || (last_tid == rec.tr_id) || !last_tid);
604 last_tid = rec.tr_id;
605 }
606#endif
607 if (!recovering)
608 count++;
609
610 switch (TRANS_TYPE(rec.tr_type)) {
611 case MS_ReferenceTxn:
612 case MS_DereferenceTxn:
613 case MS_RollBackTxn:
614 case MS_CommitTxn:
615 case MS_RecoveredTxn:
616 break;
617 default:
618 printf("Unexpected transaction type: %d\n", rec.tr_type);
619 exit(1);
620 }
621
622 if (state == MS_Committed){
623 // Dereferences are applied when the transaction is commited.
624 // References are applied imediatly and removed if the transaction is rolled back.
625 if (TRANS_TYPE(rec.tr_type) == MS_DereferenceTxn) {
626 sprintf(stmt, "DELETE FROM "LOG_TABLE" WHERE blob_ref = %"PRIu64" AND tab_id = %d AND blob_id = %"PRIu64"", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id);
627 if (mysql_query(mysql, stmt))
628 report_mysql_error(mysql, __LINE__, stmt);
629 } else if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn) {
630 sprintf(stmt, "UPDATE "LOG_TABLE" SET committed = 1 WHERE blob_ref = %"PRIu64" AND tab_id = %d AND blob_id = %"PRIu64"", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id);
631 if (mysql_query(mysql, stmt))
632 report_mysql_error(mysql, __LINE__, stmt);
633 }
634 } else if (state == MS_RolledBack) {
635 //printf("ROLLBACK!\n");
636 if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn) {
637 sprintf(stmt, "DELETE FROM "LOG_TABLE" WHERE blob_ref = %"PRIu64" AND tab_id = %d AND blob_id = %"PRIu64"", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id);
638 if (mysql_query(mysql, stmt))
639 report_mysql_error(mysql, __LINE__, stmt);
640 }
641 } else if (state == MS_Recovered) {
642 printf("Recovered transaction being ignored:\n");
643 printf("blob_ref = %"PRIu64", tab_id = %d, blob_id = %"PRIu64"\n\n", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id);
644 } else {
645 printf("Unexpected transaction state: %d\n", state);
646 exit(1);
647 }
648
649
650 }
651 }
652 catch_(a) {
653 self->logException();
654 printf("\n\n!!!!!!!! THE TRANSACTION LOG READER DIED! !!!!!!!!!!!\n\n");
655 if (!myMustQuit && !stopit)
656 exit(1);
657 }
658 cont_(a);
659 printf("The transaction log reader shutting down.\n");
660 exit_();
661}
662
663//------------------------------------------------
664void TransTestWriterThread::generate_records()
665{
666
667 MS_Txn txn_type;
668 uint64_t blob_id;
669 uint64_t blob_ref_id;
670 int tsize, i;
671 bool do_delete;
672
673 char stmt[1024];
674 enter_();
675
676 try_(a) {
677 while (!myMustQuit && !stopit) {
678
679 myActivity++;
680 usleep(nap_time); // Give up a bit of time
681 if (myMustQuit || stopit)
682 break;
683
684 tsize = rand() % max_transaction;
685
686 if (mysql_autocommit(mysql, 0))
687 report_mysql_error(mysql, __LINE__, "mysql_autocommit()");
688
689 i = 0;
690 do {
691 do_delete = ((rand() %2) == 0);
692
693 // decide if this is an insert or delete
694 if (do_delete) {
695 MYSQL_RES *results = NULL;
696 MYSQL_ROW record;
697 int cnt;
698
699 // If we are deleting then randomly select a record to delete
700 // and delete it.
701
702 txn_type = MS_DereferenceTxn;
703
704 sprintf(stmt, "select * from "REF_TABLE, tab_id);
705 if (mysql_query(mysql, stmt))
706 report_mysql_error(mysql, __LINE__, stmt);
707
708 results = mysql_store_result(mysql);
709 if (!results)
710 report_mysql_error(mysql, __LINE__, "mysql_store_result()");
711
712 cnt = mysql_num_rows(results);
713 if (!cnt)
714 do_delete = false; // There is nothing to delete
715 else {
716 mysql_data_seek(results, rand()%cnt);
717 record = mysql_fetch_row(results);
718
719 blob_ref_id = atol(record[0]);
720 blob_id = atol(record[2]);
721
722 sprintf(stmt, "DELETE FROM "REF_TABLE" WHERE blob_ref = %"PRIu64" AND blob_id = %"PRIu64"", tab_id, blob_ref_id, blob_id);
723 if (mysql_query(mysql, stmt))
724 report_mysql_error(mysql, __LINE__, stmt);
725
726 if (mysql_affected_rows(mysql) == 0)
727 do_delete = false; // Another thread must have deleted the row first.
728 else
729 fprintf(myLog, "DELETE %"PRIu64" %"PRIu64"\n", blob_ref_id, blob_id);
730 }
731
732 mysql_free_result(results);
733 }
734
735 if (!do_delete) {
736 blob_id = self->myTID; // Assign the tid as the blob id to help with debugging.
737 txn_type = MS_ReferenceTxn;
738
739 sprintf(stmt, "INSERT INTO "REF_TABLE" VALUES( NULL, %d, %"PRIu64", 0)", tab_id, tab_id, blob_id);
740 if (mysql_query(mysql, stmt))
741 report_mysql_error(mysql, __LINE__, stmt);
742
743 blob_ref_id = mysql_insert_id(mysql);
744 if (!blob_ref_id)
745 report_mysql_error(mysql, __LINE__, "mysql_insert_id() returned 0");
746
747 fprintf(myLog, "INSERT %"PRIu64" %"PRIu64"\n", blob_ref_id, blob_id);
748 // Apply the blob reference now. This will be undone if the transaction is rolled back.
749 sprintf(stmt, "INSERT INTO "LOG_TABLE" VALUES(%"PRIu64", %d, %"PRIu64", 0)", blob_ref_id, tab_id, blob_id);
750 if (mysql_query(mysql, stmt))
751 report_mysql_error(mysql, __LINE__, stmt);
752 }
753
754 i++;
755 count++;
756 if (i >= tsize) { //Commit the database transaction before the log transaction.
757 bool rollback;
758
759 rollback = ((tsize > 0) && ((rand() % 1000) == 0));
760 if (rollback) {
761 printf("Rollback\n");
762 if (mysql_rollback(mysql)) // commit the staement to the database,
763 report_mysql_error(mysql, __LINE__, "mysql_rollback()");
764 fprintf(myLog, "Rollback %"PRIu32"\n", self->myTID);
765 log->txn_LogTransaction(MS_RollBackTxn);
766 } else {
767 if (mysql_commit(mysql)) // commit the staement to the database,
768 report_mysql_error(mysql, __LINE__, "mysql_commit()");
769 fprintf(myLog, "Commit %"PRIu32"\n", self->myTID);
770 log->txn_LogTransaction(txn_type, true, A_DB_ID, tab_id, blob_id, blob_ref_id);
771 }
772 } else
773 log->txn_LogTransaction(txn_type, false, A_DB_ID, tab_id, blob_id, blob_ref_id);
774
775 } while ( i < tsize);
776
777 }
778 }
779
780 catch_(a) {
781 self->logException();
782 printf("\n\nA writer thread for table %d died! \n\n", tab_id);
783 if (i == tsize) {
784 printf(" It is possible that the last %d operations on table %d were committed to the database but not to the log.\n", tsize, tab_id);
785 }
786 if (!myMustQuit && !stopit)
787 exit(1);
788 }
789 cont_(a);
790 printf("Writer thread for table %d is shutting down.\n", tab_id);
791 exit_();
792}
793
794// SELECT * FROM TransTest.translog where blob_ref not in (select blob_ref from TransTest.transref)
795// SELECT * FROM TransTest.transref_1 where blob_ref not in (select blob_ref from TransTest.translog where tab_id = 1)
796// SELECT * FROM TransTest.translog where tab_id = 1 AND blob_ref not in (select blob_ref from TransTest.transref_1)
797// select count(*) from TransTest.translog where committed = 1
798//---------------------------------
799int main (int argc, const char * argv[])
800{
801 MYSQL *mysql;
802 TransTestWriterThread **writer = NULL;
803 int rtc = 1;
804
805 process_args(argc, argv);
806
807 mysql = new_connection(true);
808
809 if (recreate)
810 init_database(mysql, num_threads);
811
812 init_env();
813 enter_();
814
815 if (dump_log) {
816 printf("LOG dumped\n");
817 exit(1);
818 }
819
820 TransReader = TransTestReaderThread::newTransTestReaderThread(trans_log);
821 push_(TransReader);
822 TransReader->recovering = true;
823 TransReader->start();
824
825 // wait until the recovery is complete.
826 while (trans_log->txn_GetNumRecords())
827 usleep(100);
828
829 TransReader->recovering = false;
830
831 if (log_size)
832 trans_log->txn_SetLogSize(log_size);
833
834 if (cache_size)
835 trans_log->txn_SetCacheSize(cache_size);
836
837 if (revover_only) {
838 TransReader->stopit = true;
839 if (verify_database(mysql))
840 rtc = 0;
841 goto done;
842 }
843
844 try_(a) {
845 writer = (TransTestWriterThread **) cs_malloc(num_threads * sizeof(TransTestWriterThread *));
846 for (int i = 0; i < num_threads; i++) {
847 TransTestWriterThread *wt = TransTestWriterThread::newTransTestWriterThread(i+1);
848 wt->start();
849 writer[i] = wt;
850 }
851
852 printf("Timeout: %d seconds\n", timeout);
853 timeout += time(NULL);
854 int header = 0;
855 while (timeout > time(NULL)) {
856 MSTransStatsRec stats;
857 self->sleep(1000);
858 trans_log->txn_GetStats(&stats);
859
860
861 if (!(header%20)) {
862 for (int i = 0; i < num_threads; i++) {
863 if (writer[i]->myActivity == 0) {
864 printf("Writer thread %d HUNG!!!\n", i);
865 }
866 writer[i]->myActivity = 0;
867 }
868
869 if (TransReader->myActivity == 0) {
870 printf("Reader thread HUNG!!!\n");
871 }
872 TransReader->myActivity = 0;
873
874 printf("%s | %s | %s | %s | %s | %s | %s | %s\n", "LogSize", "Full", "MaxSize", "Overflows", "Overflowing", "CacheSize", "Cache Used", "Cache Hit");
875 }
876 header++;
877 //printf("Writes: %d \t\t Reads: %d \t%d \t start: %"PRIu64"\t\t eol:%"PRIu64"\n", count, TransReader->count, count - TransReader->count, trans_log->txn_Start, trans_log->txn_EOL);
878 printf("%7llu | %3d%% | %7llu | %9d | %11s | %9d | %9d%% | %9d%%\n",// | \t\t\t%"PRIu64" \t%"PRIu64"\n",
879 stats.ts_LogSize,
880 stats.ts_PercentFull,
881 stats.ts_MaxSize,
882 stats.ts_OverflowCount,
883 (stats.ts_IsOverflowing)?"Over Flow": " --- ",
884 stats.ts_TransCacheSize,
885 stats.ts_PercentTransCacheUsed,
886 stats.ts_PercentCacheHit//, trans_log->txn_Start, trans_log->txn_EOL
887 );
888
889 if (stats.ts_IsOverflowing && overflow_crash) {
890 printf("Simulating crash while in overflow\n");
891 exit(1);
892 }
893 }
894
895#ifdef CRASH_TEST
896 if (crash_site) {
897 printf("Crashing at crash site %d\n", crash_site);
898 trans_test_crash_point = crash_site;
899 // set the crash site and wait to die.
900 while(1)
901 self->sleep(1000);
902 }
903#endif
904
905 printf("Shutting down the writer threads:\n");
906 for (int i = 0; i < num_threads; i++) {
907 writer[i]->stopit = true;
908 }
909
910 TransReader->stopit = true;
911 // Give the writers a chance to shutdown by themselves.
912 int cnt = 100;
913 while (cnt) {
914 int i;
915 for (i = 0; i < num_threads && writer[i]->finished; i++);
916 if (i == num_threads && TransReader->finished)
917 break;
918 self->sleep(10);
919 cnt--;
920 }
921
922 for (int i = 0; i < num_threads; i++) {
923 writer[i]->stop();
924 }
925
926 }
927 rtc = 0;
928 catch_(a) {
929 printf("Main thread abort.\n");
930 self->logException();
931 }
932 cont_(a);
933 if (writer) {
934 for (int i = 0; i < num_threads; i++) {
935 writer[i]->stop();
936 writer[i]->release();
937 }
938 cs_free(writer);
939 }
940
941done:
942 TransReader->stop();
943 release_(TransReader);
944
945 outer_();
946
947 thread_list->stopAllThreads();
948 deinit_env();
949 mysql_close(mysql);
950 exit(rtc);
951}
952
953#endif // UNIT_TEST
9540
=== removed file 'plugin/pbms/src/alias_ms.cc'
--- plugin/pbms/src/alias_ms.cc 2010-12-18 04:43:40 +0000
+++ plugin/pbms/src/alias_ms.cc 1970-01-01 00:00:00 +0000
@@ -1,1092 +0,0 @@
1/* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
2 *
3 * PrimeBase Media Stream for MySQL
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Barry Leslie
20 *
21 * 2008-12-30
22 *
23 * H&G2JCtL
24 *
25 * BLOB alias index.
26 *
27 */
28
29#ifdef HAVE_ALIAS_SUPPORT
30#include "cslib/CSConfig.h"
31
32#include "string.h"
33
34#ifdef DRIZZLED
35#include <drizzled/common.h>
36#endif
37
38#include "cslib/CSGlobal.h"
39#include "cslib/CSLog.h"
40#include "cslib/CSStrUtil.h"
41#include "cslib/CSFile.h"
42#include "system_table_ms.h"
43#include "database_ms.h"
44
45#include "alias_ms.h"
46
47
48
49//------------------------
50MSAlias::~MSAlias()
51{
52 enter_();
53
54 ASSERT(iClosing);
55 ASSERT(iPoolSysTables.getSize() == 0);
56
57
58 if (iFilePath) {
59
60 if (iDelete)
61 iFilePath->removeFile();
62
63 iFilePath->release();
64 }
65
66 if (iFileShare)
67 iFileShare->release();
68
69 exit_();
70}
71
72//------------------------
73MSAlias::MSAlias(MSDatabase *db_noref)
74{
75 iClosing = false;
76 iDelete = false;
77 iDatabase_br = db_noref;
78 iFilePath = NULL;
79 iFileShare = NULL;
80}
81
82//------------------------
83void MSAlias::ma_close()
84{
85 enter_();
86
87 iClosing = true;
88 if (iFileShare)
89 iFileShare->close();
90 iPoolSysTables.clear();
91 exit_();
92}
93
94//------------------------
95// Compress the index bucket chain and free unused buckets.
96void MSAlias::MSAliasCompress(CSFile *fa, CSSortedList *freeList, MSABucketLinkedList *bucketChain)
97{
98 // For now I will just remove empty buckets.
99 // Later this function should also compress the records also
100 // thus making the searches faster and freeing up more space.
101 MSABucketInfo *b_info, *next;
102
103 b_info = bucketChain->getFront();
104 while (b_info) {
105 next = b_info->getNextLink();
106 if (b_info->getSize() == 0) {
107 bucketChain->remove(RETAIN(b_info));
108 freeList->add(b_info);
109 }
110 b_info = next;
111 }
112
113}
114
115//------------------------
116void MSAlias::MSAliasLoad()
117{
118 CSFile *fa = NULL;
119 CSSortedList freeList;
120 off64_t fileSize;
121
122 enter_();
123
124 fa = CSFile::newFile(RETAIN(iFilePath));
125 push_(fa);
126
127 MSAliasHeadRec header;
128 uint64_t free_list_offset;
129 fa->open(CSFile::DEFAULT);
130 fa->read(&header, 0, sizeof(header), sizeof(header));
131
132 /* Check the file header: */
133 if (CS_GET_DISK_4(header.ah_magic_4) != MS_ALIAS_FILE_MAGIC)
134 CSException::throwFileError(CS_CONTEXT, iFilePath->getCString(), CS_ERR_BAD_HEADER_MAGIC);
135 if (CS_GET_DISK_2(header.ah_version_2) != MS_ALIAS_FILE_VERSION)
136 CSException::throwFileError(CS_CONTEXT, iFilePath->getCString(), CS_ERR_VERSION_TOO_NEW);
137
138 free_list_offset = CS_GET_DISK_8(header.ah_free_list_8);
139
140 fileSize = CS_GET_DISK_8(header.ah_file_size_8);
141
142 // Do some sanity checks.
143 if (CS_GET_DISK_2(header.ah_head_size_2) != sizeof(header))
144 CSException::throwFileError(CS_CONTEXT, iFilePath->getCString(), CS_ERR_BAD_FILE_HEADER);
145
146 if (CS_GET_DISK_2(header.ah_num_buckets_2) != BUCKET_LIST_SIZE)
147 CSException::throwFileError(CS_CONTEXT, iFilePath->getCString(), CS_ERR_BAD_FILE_HEADER);
148
149 if (CS_GET_DISK_4(header.ah_bucket_size_4) != NUM_RECORDS_PER_BUCKET)
150 CSException::throwFileError(CS_CONTEXT, iFilePath->getCString(), CS_ERR_BAD_FILE_HEADER);
151
152 if (fileSize != fa->getEOF())
153 CSException::throwFileError(CS_CONTEXT, iFilePath->getCString(), CS_ERR_BAD_FILE_HEADER);
154
155 // Load the bucket headers into RAM
156 MSADiskBucketHeadRec bucketHead = {0};
157 uint64_t offset, start_offset;
158
159 // Fist load the free list:
160 if (free_list_offset) {
161 start_offset = offset = free_list_offset;
162 do {
163 fa->read(&bucketHead, offset, sizeof(MSADiskBucketHeadRec), sizeof(MSADiskBucketHeadRec));
164 freeList.add(MSABucketInfo::newMSABucketInfo(offset));
165 offset = CS_GET_DISK_8(bucketHead.ab_next_bucket_8);
166 } while (offset != start_offset);
167
168 }
169 for (uint32_t i = 0; i < BUCKET_LIST_SIZE; i++) {
170 uint64_t used, total_space;
171 MSABucketLinkedList *bucketChain = &(iFileShare->msa_buckets[i]);
172
173 start_offset = offset = sizeof(header) + i * sizeof(MSADiskBucketRec);
174 used = total_space = 0;
175 do {
176 uint32_t num, end_of_records;
177
178 fa->read(&bucketHead, offset, sizeof(MSADiskBucketHeadRec), sizeof(MSADiskBucketHeadRec));
179 num = CS_GET_DISK_4(bucketHead.ab_num_recs_4);
180 end_of_records = CS_GET_DISK_4(bucketHead.ab_eor_rec_4);
181 total_space += NUM_RECORDS_PER_BUCKET;
182 used += num;
183 bucketChain->addFront(MSABucketInfo::newMSABucketInfo(offset, num, end_of_records));
184 offset = CS_GET_DISK_8(bucketHead.ab_next_bucket_8);
185
186 } while (offset != start_offset);
187
188 // Pack the index if required
189 if (((total_space - used) / NUM_RECORDS_PER_BUCKET) > 1)
190 MSAliasCompress(fa, &freeList, bucketChain);
191
192 }
193
194 // If there are free buckets try to free up some disk
195 // space or add them to a free list to be reused later.
196 if (freeList.getSize()) {
197 uint64_t last_bucket = fileSize - sizeof(MSADiskBucketRec);
198 MSABucketInfo *rec;
199 bool reduce = false;
200
201 // Search for freed buckets at the end of the file
202 // so that they can be released and the file
203 // shrunk.
204 //
205 // The free list has been sorted so that buckets
206 // with the highest file offset are first.
207 do {
208 rec = (MSABucketInfo*) freeList.itemAt(0);
209 if (rec->bi_bucket_offset != last_bucket);
210 break;
211
212 last_bucket -= sizeof(MSADiskBucketRec);
213 freeList.remove(rec);
214 reduce = true;
215 } while (freeList.getSize());
216
217 if (reduce) {
218 // The file can be reduced in size.
219 fileSize = last_bucket + sizeof(MSADiskBucketRec);
220 fa->setEOF(fileSize);
221 CS_SET_DISK_8(header.ah_file_size_8, fileSize);
222 fa->write(&header.ah_file_size_8, offsetof(MSAliasHeadRec,ah_file_size_8) , 8);
223 }
224
225 // Add the empty buckets to the index file's empty bucket list.
226 memset(&bucketHead, 0, sizeof(bucketHead));
227 offset = 0;
228 while (freeList.getSize()) { // Add the empty buckets to the empty_bucket list.
229 rec = (MSABucketInfo*) freeList.takeItemAt(0);
230
231 // buckets are added to the front of the list.
232 fa->write(&offset, rec->bi_bucket_offset + offsetof(MSADiskBucketHeadRec,ab_next_bucket_8) , 8);
233 offset = rec->bi_bucket_offset;
234 fa->write(&offset, offsetof(MSAliasHeadRec,ah_free_list_8) , 8);
235
236 iFileShare->msa_empty_buckets.addFront(rec);
237 }
238 }
239
240 iFileShare->msa_fileSize = fa->getEOF();
241
242 release_(fa);
243 exit_();
244}
245
246//------------------------
247void MSAlias::buildAliasIndex()
248{
249 MSBlobHeadRec blob;
250 MSRepository *repo;
251 uint64_t blob_size, fileSize, offset;
252 uint16_t head_size;
253 MSAliasFile *afile;
254 MSAliasRec aliasRec;
255
256 enter_();
257
258 afile = getAliasFile();
259 frompool_(afile);
260
261 afile->startLoad();
262
263 CSSyncVector *repo_list = iDatabase_br->getRepositoryList();
264
265 // No locking is required since the index is loaded before the database is opened
266 // and the compactor thread is started.
267
268 for (uint32_t repo_index =0; repo_index<repo_list->size(); repo_index++) {
269 if ((repo = (MSRepository *) repo_list->get(repo_index))) {
270 MSRepoFile *repoFile = repo->openRepoFile();
271 push_(repoFile);
272 fileSize = repo->getRepoFileSize();
273 offset = repo->getRepoHeadSize();
274
275 aliasRec.repo_id = repoFile->myRepo->getRepoID();
276
277 while (offset < fileSize) {
278 if (repoFile->read(&blob, offset, sizeof(MSBlobHeadRec), 0) < sizeof(MSBlobHeadRec))
279 break;
280
281 if ((CS_GET_DISK_1(blob.rb_status_1) == MS_BLOB_REFERENCED) && CS_GET_DISK_2(blob.rb_alias_offset_2)) {
282 aliasRec.repo_offset = offset;
283 aliasRec.alias_hash = CS_GET_DISK_4(blob.rb_alias_hash_4);
284 addAlias(afile, &aliasRec);
285 }
286
287 head_size = CS_GET_DISK_2(blob.rb_head_size_2);
288 blob_size = CS_GET_DISK_6(blob.rb_blob_repo_size_6);
289 offset += head_size + blob_size;
290 }
291
292 release_(repoFile);
293 }
294 }
295
296 afile->finishLoad();
297 backtopool_(afile);
298
299 exit_();
300}
301
302//------------------------
303void MSAlias::MSAliasBuild()
304{
305 CSFile *fa;
306 MSAliasHeadRec header = {0};
307 uint64_t offset, size = sizeof(header) + BUCKET_LIST_SIZE * sizeof(MSADiskBucketRec);
308 enter_();
309
310 fa = CSFile::newFile(RETAIN(iFilePath));
311 push_(fa);
312
313 fa->open(CSFile::CREATE | CSFile::TRUNCATE);
314
315 // Create an empty index with 1 empty bucket in each bucket chain.
316
317 CS_SET_DISK_4(header.ah_magic_4, MS_ALIAS_FILE_MAGIC);
318 CS_SET_DISK_2(header.ah_version_2, MS_ALIAS_FILE_VERSION);
319
320 CS_SET_DISK_2(header.ah_head_size_2, sizeof(header));
321 CS_SET_DISK_8(header.ah_file_size_8, size);
322
323 CS_SET_DISK_2(header.ah_num_buckets_2, BUCKET_LIST_SIZE);
324 CS_SET_DISK_2(header.ah_bucket_size_4, NUM_RECORDS_PER_BUCKET);
325
326 fa->setEOF(size); // Grow the file.
327 fa->write(&header, 0, sizeof(header));
328
329 offset = sizeof(header);
330
331 // Initialize the file bucket chains.
332 MSADiskBucketHeadRec bucketHead = {0};
333 for (uint32_t i = 0; i < BUCKET_LIST_SIZE; i++) {
334 CS_SET_DISK_8(bucketHead.ab_prev_bucket_8, offset);
335 CS_SET_DISK_8(bucketHead.ab_next_bucket_8, offset);
336 fa->write(&bucketHead, offset, sizeof(MSADiskBucketHeadRec));
337 // Add the bucket to the RAM based list.
338 iFileShare->msa_buckets[i].addFront(MSABucketInfo::newMSABucketInfo(offset));
339 offset += sizeof(MSADiskBucketRec); // NOTE: MSADiskBucketRec not MSADiskBucketHeadRec
340 }
341
342 fa->sync();
343
344
345
346 fa->close();
347
348 release_(fa);
349
350 // Scan through all the BLOBs in the repository and add an entry
351 // for each blob alias.
352 buildAliasIndex();
353
354 exit_();
355}
356
357//------------------------
358void MSAlias::ma_open(const char *file_name)
359{
360 bool isdir = false;
361
362 enter_();
363
364 iFilePath = CSPath::newPath(RETAIN(iDatabase_br->myDatabasePath), file_name);
365
366retry:
367 new_(iFileShare, MSAliasFileShare(RETAIN(iFilePath)));
368
369 if (iFilePath->exists(&isdir)) {
370 try_(a) {
371 MSAliasLoad();
372 }
373 catch_(a) {
374 // If an error occurs delete the index and rebuild it.
375 self->myException.log(NULL);
376 iFileShare->release();
377 iFilePath->removeFile();
378 goto retry;
379 }
380 cont_(a);
381 } else
382 MSAliasBuild();
383
384
385 exit_();
386}
387
388//------------------------
389uint32_t MSAlias::hashAlias(const char *ptr)
390{
391 register uint32_t h = 0, g;
392
393 while (*ptr) {
394 h = (h << 4) + (uint32_t) toupper(*ptr++);
395 if ((g = (h & 0xF0000000)))
396 h = (h ^ (g >> 24)) ^ g;
397 }
398
399 return (h);
400}
401
402//------------------------
403void MSAlias::addAlias(MSAliasFile *af, MSAliasRec *rec)
404{
405 MSDiskAliasRec diskRec;
406 CS_SET_DISK_4(diskRec.ar_repo_id_4, rec->repo_id);
407 CS_SET_DISK_8(diskRec.ar_offset_8, rec->repo_offset);
408 CS_SET_DISK_4(diskRec.ar_hash_4, rec->alias_hash);
409 af->addRec(&diskRec);
410
411}
412
413//------------------------
414uint32_t MSAlias::addAlias(uint32_t repo_id, uint64_t repo_offset, const char *alias)
415{
416 MSDiskAliasRec diskRec;
417 uint32_t hash;
418 uint32_t f_repo_id;
419 uint64_t f_repo_offset;
420 bool referenced = false;
421 enter_();
422
423 hash = hashAlias(alias);
424
425 // Use a lock to make sure that the same alias cannot be added at the same time.
426 lock_(this);
427
428 MSAliasFile *af = getAliasFile();
429 frompool_(af);
430
431 if (findBlobByAlias(RETAIN(af), alias, &referenced, &f_repo_id, &f_repo_offset)) {
432 if ((f_repo_id == repo_id) && (f_repo_offset == repo_offset))
433 goto done; // Do not treat this as an error.
434 if (!referenced) {
435 // If the alias is in use by a non referenced BLOB then delete it.
436 // This can happen because I allow newly created BLOBs to be accessed
437 // by their alias even before a reference to the BLOB has been added to
438 // the database.
439 af->deleteCurrentRec();
440 } else {
441#ifdef xxDEBUG
442 CSL.log(self, CSLog::Protocol, "Alias: ");
443 CSL.log(self, CSLog::Protocol, alias);
444 CSL.log(self, CSLog::Protocol, "\n");
445#endif
446 CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE, "Alias Exists");
447 }
448 }
449
450 CS_SET_DISK_4(diskRec.ar_repo_id_4, repo_id);
451 CS_SET_DISK_8(diskRec.ar_offset_8, repo_offset);
452 CS_SET_DISK_4(diskRec.ar_hash_4, hash);
453
454 af->addRec(&diskRec);
455done:
456 backtopool_(af);
457
458 unlock_(this);
459 return_(hash);
460}
461
462//------------------------
463void MSAlias::deleteAlias(MSDiskAliasPtr diskRec)
464{
465 enter_();
466
467 MSAliasFile *af = getAliasFile();
468 frompool_(af);
469 if (af->findRec(diskRec))
470 af->deleteCurrentRec();
471 backtopool_(af);
472
473 exit_();
474}
475
476//------------------------
477void MSAlias::deleteAlias(uint32_t repo_id, uint64_t repo_offset, uint32_t alias_hash)
478{
479 MSDiskAliasRec diskRec;
480
481 CS_SET_DISK_4(diskRec.ar_repo_id_4, repo_id);
482 CS_SET_DISK_8(diskRec.ar_offset_8, repo_offset);
483 CS_SET_DISK_4(diskRec.ar_hash_4, alias_hash);
484 deleteAlias(&diskRec);
485
486}
487//------------------------
488void MSAlias::resetAlias(uint32_t old_repo_id, uint64_t old_repo_offset, uint32_t alias_hash, uint32_t new_repo_id, uint64_t new_repo_offset)
489{
490 MSDiskAliasRec diskRec;
491 bool found;
492 enter_();
493
494 CS_SET_DISK_4(diskRec.ar_repo_id_4, old_repo_id);
495 CS_SET_DISK_8(diskRec.ar_offset_8, old_repo_offset);
496 CS_SET_DISK_4(diskRec.ar_hash_4, alias_hash);
497
498 lock_(this);
499
500 MSAliasFile *af = getAliasFile();
501 frompool_(af);
502 found = af->findRec(&diskRec);
503 CS_SET_DISK_4(diskRec.ar_repo_id_4, new_repo_id);
504 CS_SET_DISK_8(diskRec.ar_offset_8, new_repo_offset);
505
506 if (found)
507 af->updateCurrentRec(&diskRec);
508 else {
509 CSException::logException(CS_CONTEXT, MS_ERR_NOT_FOUND, "Alias doesn't exists");
510 af->addRec(&diskRec);
511 }
512
513 backtopool_(af);
514
515 unlock_(this);
516 exit_();
517}
518
519//------------------------
520// Check to see if the blob with the given repo_id
521// and repo_offset has the specified alias.
522bool MSAlias::hasBlobAlias(uint32_t repo_id, uint64_t repo_offset, const char *alias, bool *referenced)
523{
524 bool found = false;
525 MSRepoFile *repoFile;
526 MSBlobHeadRec blob;
527 uint8_t status;
528 uint64_t offset;
529 uint32_t alias_size = strlen(alias) +1;
530 char blob_alias[BLOB_ALIAS_LENGTH +1];
531
532 if (alias_size > BLOB_ALIAS_LENGTH)
533 return false;
534
535 enter_();
536
537 repoFile = iDatabase_br->getRepoFileFromPool(repo_id, false);
538 frompool_(repoFile);
539
540 repoFile->read(&blob, repo_offset, sizeof(MSBlobHeadRec), sizeof(MSBlobHeadRec));
541 status = CS_GET_DISK_1(blob.rb_status_1);
542 if (IN_USE_BLOB_STATUS(status)) {
543 offset = repo_offset + CS_GET_DISK_2(blob.rb_alias_offset_2);
544
545 blob_alias[BLOB_ALIAS_LENGTH] = 0;
546 if (repoFile->read(blob_alias, offset, alias_size, 0) == alias_size) {
547 found = !my_strcasecmp(&my_charset_utf8_general_ci, blob_alias, alias);
548 if (found)
549 *referenced = (status == MS_BLOB_REFERENCED);
550 }
551 } else {
552 CSException::logException(CS_CONTEXT, MS_ERR_ENGINE, "Deleted BLOB alias found. (Rebuild BLOB alias index.)");
553 }
554
555
556 backtopool_(repoFile);
557
558 return_(found);
559}
560
561//------------------------
562bool MSAlias::findBlobByAlias( MSAliasFile *af, const char *alias, bool *referenced, uint32_t *repo_id, uint64_t *repo_offset)
563{
564 bool found = false;
565 uint32_t hash, l_repo_id, l_repo_offset;
566 MSDiskAliasPtr diskRec;
567 enter_();
568
569 push_(af);
570
571 hash = hashAlias(alias);
572 diskRec = af->findRec(hash);
573
574 while (diskRec && !found) {
575 l_repo_id = CS_GET_DISK_4(diskRec->ar_repo_id_4);
576 l_repo_offset = CS_GET_DISK_8(diskRec->ar_offset_8);
577 if (hasBlobAlias(l_repo_id, l_repo_offset, alias, referenced))
578 found = true;
579 else
580 diskRec = af->nextRec();
581 }
582
583 if (found) {
584 if (repo_id)
585 *repo_id = l_repo_id;
586
587 if (repo_offset)
588 *repo_offset = l_repo_offset;
589 }
590
591 release_(af);
592 return_(found);
593}
594//------------------------
595bool MSAlias::findBlobByAlias( const char *alias, bool *referenced, uint32_t *repo_id, uint64_t *repo_offset)
596{
597 bool found;
598 enter_();
599
600 MSAliasFile *af = getAliasFile();
601 frompool_(af);
602
603 found = findBlobByAlias(RETAIN(af), alias, referenced, repo_id, repo_offset);
604
605 backtopool_(af);
606 return_(found);
607}
608
609//------------------------
610bool MSAlias::blobAliasExists(uint32_t repo_id, uint64_t repo_offset, uint32_t alias_hash)
611{
612 bool found;
613 MSDiskAliasRec diskRec;
614
615 CS_SET_DISK_4(diskRec.ar_repo_id_4, repo_id);
616 CS_SET_DISK_8(diskRec.ar_offset_8, repo_offset);
617 CS_SET_DISK_4(diskRec.ar_hash_4, alias_hash);
618
619 enter_();
620
621 MSAliasFile *af = getAliasFile();
622 frompool_(af);
623
624 found = af->findRec(&diskRec);
625
626 backtopool_(af);
627 return_(found);
628}
629
630/////////////////////////////////////
631MSSysMeta::MSSysMeta(MSAlias *msa)
632{
633 md_myMSAlias = msa;
634 md_isFileInUse = false;
635 md_NextLink = md_PrevLink = NULL;
636
637 mtab = MSMetaDataTable::newMSMetaDataTable(RETAIN(msa->iDatabase_br));
638}
639
640//------------------------
641MSSysMeta::~MSSysMeta()
642{
643 if (mtab)
644 mtab->release();
645
646 if (md_myMSAlias)
647 md_myMSAlias->release();
648}
649
650//------------------------
651void MSSysMeta::returnToPool()
652{
653 enter_();
654 push_(this);
655
656
657 md_isFileInUse = false;
658
659 if (!md_myMSAlias->iClosing) {
660 lock_(&md_myMSAlias->iSysTablePoolLock); // It may be better if the pool had it's own lock.
661 md_nextFile = md_myMSAlias->iSysTablePool;
662 md_myMSAlias->iSysTablePool - this;
663 unlock_(&md_myMSAlias->iSysTablePoolLock);
664 }
665
666 release_(this);
667 exit_();
668}
669//------------------------
670bool MSSysMeta::matchAlias(uint32_t repo_id, uint64_t repo_offset, const char *alias)
671{
672 mtab->seqScanInit();
673 return mtab->matchAlias(repo_id, repo_offset, alias);
674}
675
676/////////////////////////////////////
677/////////////////////////////////////
678MSAliasFile::MSAliasFile(MSAliasFileShare *share)
679{
680 ba_share = share;
681 ba_isFileInUse = false;
682 ba_NextLink = ba_PrevLink = NULL;
683
684 iCurrentRec = 0;
685 iBucketCache = NULL;
686 iStartBucket = iCurrentBucket = NULL;
687 iBucketChain = NULL;
688 iLoading = false;
689 ba_nextFile = NULL;
690
691 iFile = CSFile::newFile(RETAIN(ba_share->msa_filePath));
692 iFile->open(CSFile::DEFAULT);
693
694
695}
696
697//------------------------
698MSAliasFile::~MSAliasFile()
699{
700 if (iFile)
701 iFile->release();
702
703 if (iBucketCache)
704 cs_free(iBucketCache);
705}
706
707//------------------------
708void MSAliasFile::startLoad()
709{
710 enter_();
711
712 ASSERT(!iLoading);
713
714// iBucketCache = (MSADiskBucketRec*) cs_malloc(BUCKET_LIST_SIZE * sizeof(MSADiskBucketRec));
715// memset(iBucketCache, 0, BUCKET_LIST_SIZE * sizeof(MSADiskBucketRec));
716 iLoading = true;
717
718 exit_();
719}
720
721//------------------------
722void MSAliasFile::finishLoad()
723{
724 enter_();
725 ASSERT(iLoading);
726 // Write the bucket cache to disk.
727// for (iCurrentBucket && iCurrentBucket->getSize()) {
728 // To Be implemented.
729// }
730// cs_free(iBucketCache);
731 iBucketCache = NULL;
732 iLoading = false;
733 exit_();
734}
735
736//------------------------
737void MSAliasFile::returnToPool()
738{
739 enter_();
740 push_(this);
741
742 if (iLoading) {
743 // If iLoading is still set then probably an exception has been thrown.
744 try_(a) {
745 finishLoad();
746 }
747 catch_(a)
748 iLoading = false;
749 cont_(a);
750 }
751
752 ba_isFileInUse = false;
753
754 if (!ba_share->msa_closing) {
755 lock_(&ba_share->msa_poolLock);
756 ba_nextFile = ba_share->msa_pool;
757 ba_share->msa_pool = this;
758 unlock_(&ba_share->msa_poolLock);
759 }
760
761 release_(this);
762 exit_();
763}
764
765//------------------------
766// The bucket chain is treated as a circular list.
767bool MSAliasFile::nextBucket(bool with_space)
768{
769 bool have_bucket = false;
770 enter_();
771
772 while (!have_bucket){
773 if (iCurrentBucket) {
774 iCurrentBucket = iCurrentBucket->getNextLink();
775 if (!iCurrentBucket)
776 iCurrentBucket = iBucketChain->getFront();
777 if (iCurrentBucket == iStartBucket)
778 break;
779 } else {
780 iCurrentBucket = iBucketChain->getFront();
781 iStartBucket = iCurrentBucket;
782 }
783
784 if ((iCurrentBucket->getSize() && !with_space) || (with_space && (iCurrentBucket->getSize() < NUM_RECORDS_PER_BUCKET))){
785 // Only read the portion of the bucket containing records.
786 iCurrentRec = iCurrentBucket->getEndOfRecords(); // The current record is set just beyond the last valid record.
787 size_t size = iCurrentRec * sizeof(MSDiskAliasRec);
788 iFile->read(iBucket, iCurrentBucket->bi_records_offset, size, size);
789 have_bucket = true;
790 }
791 }
792
793 return_(have_bucket);
794}
795
796//------------------------
797MSDiskAliasPtr MSAliasFile::nextRec()
798{
799 MSDiskAliasPtr rec = NULL;
800 bool have_rec;
801 enter_();
802
803 while ((!(have_rec = scanBucket())) && nextBucket(false));
804
805 if (have_rec)
806 rec = &(iBucket[iCurrentRec]);
807
808 return_(rec);
809}
810
811//------------------------
812// When starting a search:
813// If a bucket is already loaded and it is in the correct bucket chain
814// then search it first. In this case then the search starts at the current
815// bucket in the chain.
816//
817// Searches are from back to front with the idea that the more recently
818// added objects will be seached for more often and they are more likely
819// to be at the end of the chain.
820MSDiskAliasPtr MSAliasFile::findRec(uint32_t hash)
821{
822 MSDiskAliasPtr rec = NULL;
823 MSABucketLinkedList *list = ba_share->getBucketChain(hash);
824 enter_();
825
826 CS_SET_DISK_4(iDiskHash_4, hash);
827 if (list == iBucketChain) {
828 // The search is performed back to front.
829 iCurrentRec = iCurrentBucket->getEndOfRecords(); // Position the start just beyond the last valid record.
830 iStartBucket = iCurrentBucket;
831 if (scanBucket()) {
832 rec = &(iBucket[iCurrentRec]);
833 goto done;
834 }
835 } else {
836 iBucketChain = list;
837 iCurrentBucket = NULL;
838 iStartBucket = NULL;
839 }
840
841 if (nextBucket(false))
842 rec = nextRec();
843
844done:
845 return_(rec);
846}
847
848//------------------------
849bool MSAliasFile::findRec(MSDiskAliasPtr theRec)
850{
851 MSDiskAliasPtr aRec = NULL;
852 bool found = false;
853 enter_();
854
855 aRec = findRec(CS_GET_DISK_4(theRec->ar_hash_4));
856 while ( aRec && !found) {
857 if (CS_EQ_DISK_4(aRec->ar_repo_id_4, theRec->ar_repo_id_4) && CS_EQ_DISK_8(aRec->ar_offset_8, theRec->ar_offset_8))
858 found = true;
859 else
860 aRec = nextRec();
861 }
862 return_(found);
863}
864
865//------------------------
866void MSAliasFile::addRec(MSDiskAliasPtr new_rec)
867{
868 MSABucketLinkedList *list = ba_share->getBucketChain(CS_GET_DISK_4(new_rec->ar_hash_4));
869 enter_();
870 lock_(&ba_share->msa_writeLock);
871
872 if (iBucketChain != list) {
873 iBucketChain = list;
874 iCurrentBucket = NULL;
875 iStartBucket = NULL;
876 } else
877 iStartBucket = iCurrentBucket;
878
879 if ((iCurrentBucket && (iCurrentBucket->getSize() < NUM_RECORDS_PER_BUCKET)) || nextBucket(true)) { // Find a bucket with space in it for a record.
880 uint32_t size = iCurrentBucket->getSize();
881 uint32_t end_of_records = iCurrentBucket->getEndOfRecords();
882
883 if (size == end_of_records) { // No holes in the recored list
884 iCurrentRec = end_of_records;
885 } else { // Search for the empty record
886 iCurrentRec = end_of_records -2;
887 while (iCurrentRec && !CS_IS_NULL_DISK_4(iBucket[iCurrentRec].ar_repo_id_4))
888 iCurrentRec--;
889
890 ASSERT(CS_IS_NULL_DISK_4(iBucket[iCurrentRec].ar_repo_id_4));
891 }
892
893 memcpy(&iBucket[iCurrentRec], new_rec, sizeof(MSDiskAliasRec)); // Add the record to the cached bucket.
894
895 iCurrentBucket->recAdded(iFile, iCurrentRec); // update the current bucket header.
896 } else { // A new bucket must be added to the chain.
897 MSADiskBucketHeadRec new_bucket = {0};
898 CSDiskValue8 disk_8_value;
899 uint64_t new_bucket_offset;
900 MSABucketInfo *next, *prev;
901
902 next = iBucketChain->getFront();
903 prev = iBucketChain->getBack();
904
905 // Set the next and prev bucket offsets in the new bucket record.
906 CS_SET_DISK_8(new_bucket.ab_prev_bucket_8, prev->bi_bucket_offset);
907 CS_SET_DISK_8(new_bucket.ab_next_bucket_8, next->bi_bucket_offset);
908
909 if (ba_share->msa_empty_buckets.getSize()) { // Get a bucket from the empty bucket list.
910 MSABucketInfo *empty_bucket = ba_share->msa_empty_buckets.removeFront();
911
912 new_bucket_offset = empty_bucket->bi_bucket_offset;
913 empty_bucket->release();
914
915 // Update the index file's empty bucket list
916 if (ba_share->msa_empty_buckets.getSize() == 0)
917 CS_SET_NULL_DISK_8(disk_8_value);
918 else
919 CS_SET_DISK_8(disk_8_value, iBucketChain->getFront()->bi_bucket_offset);
920
921 iFile->write(&disk_8_value, offsetof(MSAliasHeadRec,ah_free_list_8) , 8);
922 } else // There are no empty buckets so grow the file.
923 new_bucket_offset = ba_share->msa_fileSize;
924
925 // Write the new bucket's record header to the file
926 iFile->write(&new_bucket, new_bucket_offset, sizeof(MSADiskBucketHeadRec));
927
928 // Insert the new bucket into the bucket chain on the disk.
929 CS_SET_DISK_8(disk_8_value, new_bucket_offset);
930 iFile->write(&disk_8_value, prev->bi_bucket_offset + offsetof(MSADiskBucketHeadRec,ab_next_bucket_8), 8);
931 iFile->write(&disk_8_value, next->bi_bucket_offset + offsetof(MSADiskBucketHeadRec,ab_prev_bucket_8), 8);
932
933 // Update the file size in the file header if required
934 if (ba_share->msa_fileSize == new_bucket_offset) {
935 ba_share->msa_fileSize += sizeof(MSADiskBucketRec); // Note this is MSADiskBucketRec not MSADiskBucketHeadRec
936
937 CS_SET_DISK_8(disk_8_value, ba_share->msa_fileSize);
938 iFile->write(&disk_8_value, offsetof(MSAliasHeadRec,ah_file_size_8) , 8);
939 }
940
941 // Add the info rec into the bucket chain in RAM.
942 iCurrentBucket = MSABucketInfo::newMSABucketInfo(new_bucket_offset, 1, 0);
943 iBucketChain->addFront(iCurrentBucket);
944 iCurrentRec = 0;
945 }
946
947 uint64_t offset;
948 offset = iCurrentBucket->bi_records_offset + iCurrentRec * sizeof(MSDiskAliasRec);
949
950 // Write the new index entry to the index file.
951 iFile->write(new_rec, offset, sizeof(MSDiskAliasRec));
952
953 unlock_(&ba_share->msa_writeLock);
954
955 exit_();
956}
957//------------------------
958void MSAliasFile::deleteCurrentRec()
959{
960 MSDiskAliasPtr rec = &(iBucket[iCurrentRec]);
961 uint64_t offset;
962 enter_();
963
964 CS_SET_NULL_DISK_4(rec->ar_repo_id_4);
965 offset = iCurrentBucket->bi_records_offset + iCurrentRec * sizeof(MSDiskAliasRec);
966
967 lock_(&ba_share->msa_writeLock);
968
969 // Update the index file. It is assumed that repo_id is the first 4 bytes of 'rec'.
970 iFile->write(rec, offset, 4);
971
972 iCurrentBucket->recRemoved(iFile, iCurrentRec, iBucket);
973
974 unlock_(&ba_share->msa_writeLock);
975
976 exit_();
977}
978
979//------------------------
980void MSAliasFile::updateCurrentRec(MSDiskAliasPtr update_rec)
981{
982 uint64_t offset;
983 enter_();
984
985 // ASSERT that the updated rec still belongs to this bucket chain.
986 ASSERT(ba_share->getBucketChain(CS_GET_DISK_4(update_rec->ar_hash_4)) == iBucketChain);
987 ASSERT(!CS_IS_NULL_DISK_4(iBucket[iCurrentRec].ar_repo_id_4)); // We should not be updating a deleted record.
988
989 lock_(&ba_share->msa_writeLock);
990 offset = iCurrentBucket->bi_records_offset + iCurrentRec * sizeof(MSDiskAliasRec);
991
992 // Update the record on disk.
993 iFile->write(update_rec, offset, sizeof(MSDiskAliasRec));
994
995 // Update the record in memory.
996 CS_COPY_DISK_4(iBucket[iCurrentRec].ar_repo_id_4, update_rec->ar_repo_id_4);
997 CS_COPY_DISK_8(iBucket[iCurrentRec].ar_offset_8, update_rec->ar_offset_8);
998
999 unlock_(&ba_share->msa_writeLock);
1000 exit_();
1001}
1002
1003
1004//------------------------
1005MSABucketInfo *MSABucketInfo::newMSABucketInfo(uint64_t offset, uint32_t num, uint32_t last)
1006{
1007 MSABucketInfo *bucket;
1008 new_(bucket, MSABucketInfo(offset, num, last));
1009 return bucket;
1010}
1011//------------------------
1012void MSABucketInfo::recRemoved(CSFile *iFile, uint32_t idx, MSDiskAliasRec bucket[])
1013{
1014 MSADiskBucketHeadRec head;
1015 enter_();
1016
1017 ASSERT(idx < bi_end_of_records);
1018
1019 bi_num_recs--;
1020 if (!bi_num_recs) {
1021 // It would be nice to remove this bucket from the
1022 // bucket list and place it on the empty list.
1023 // Before this can be done a locking method would
1024 // be needed to block anyone from reading this
1025 // bucket while it was being moved.
1026 //
1027 // I haven't done this because I have been trying
1028 // to avoid read locks.
1029 bi_end_of_records = 0;
1030 } else if ((bi_end_of_records -1) == idx) {
1031 while (idx && CS_IS_NULL_DISK_4(bucket[idx].ar_repo_id_4))
1032 idx--;
1033
1034 if ((idx ==0) && CS_IS_NULL_DISK_4(bucket[0].ar_repo_id_4))
1035 bi_end_of_records = 0;
1036 else
1037 bi_end_of_records = idx +1;
1038
1039 ASSERT(bi_end_of_records >= bi_num_recs);
1040 }
1041
1042 // Update the index file.
1043 CS_SET_DISK_4(head.ab_num_recs_4, bi_num_recs);
1044 CS_SET_DISK_4(head.ab_eor_rec_4, bi_end_of_records);
1045 iFile->write(&head.ab_num_recs_4, bi_bucket_offset + offsetof(MSADiskBucketHeadRec,ab_num_recs_4), 8);
1046 exit_();
1047}
1048
1049//------------------------
1050void MSABucketInfo::recAdded(CSFile *iFile, uint32_t idx)
1051{
1052 MSADiskBucketHeadRec head;
1053 enter_();
1054
1055 ASSERT(bi_num_recs < NUM_RECORDS_PER_BUCKET);
1056 ASSERT(idx < NUM_RECORDS_PER_BUCKET);
1057
1058 bi_num_recs++;
1059 if (idx == bi_end_of_records)
1060 bi_end_of_records++;
1061
1062 // Update the index file.
1063 CS_SET_DISK_4(head.ab_num_recs_4, bi_num_recs);
1064 CS_SET_DISK_4(head.ab_eor_rec_4, bi_end_of_records);
1065 iFile->write(&head.ab_num_recs_4, bi_bucket_offset + offsetof(MSADiskBucketHeadRec,ab_num_recs_4), 8);
1066 exit_();
1067}
1068
1069//////////////////////////////////
1070MSAliasFile *MSAliasFileShare::getPoolFile()
1071{
1072 MSAliasFile *af;
1073 enter_();
1074
1075 lock_(&msa_poolLock);
1076 if ((af = msa_pool)) {
1077 msa_pool = af->ba_nextFile;
1078 } else {
1079 new_(af, MSAliasFile(this));
1080 msa_poolFiles.addFront(af);
1081 }
1082 unlock_(&msa_poolLock);
1083
1084 af->ba_nextFile = NULL;
1085 ASSERT(!af->ba_isFileInUse);
1086 af->ba_isFileInUse = true;
1087 af->retain();
1088
1089 return_(af);
1090}
1091#endif // HAVE_ALIAS_SUPPORT
1092
10930
=== removed file 'plugin/pbms/src/alias_ms.h'
--- plugin/pbms/src/alias_ms.h 2011-03-14 05:40:28 +0000
+++ plugin/pbms/src/alias_ms.h 1970-01-01 00:00:00 +0000
@@ -1,350 +0,0 @@
1/* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
2 *
3 * PrimeBase Media Stream for MySQL
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Barry Leslie
20 *
21 * 2008-12-30
22 *
23 * H&G2JCtL
24 *
25 * BLOB alias index.
26 *
27 */
28
29
30#pragma once
31#ifndef __ALIAS_MS_H__
32#define __ALIAS_MS_H__
33#include <stddef.h>
34
35#include "defs_ms.h"
36#include "cslib/CSStorage.h"
37
38#define MS_ALIAS_FILE_MAGIC 0x5954228A
39#define MS_ALIAS_FILE_VERSION 1
40#define BLOB_ALIAS_LENGTH 1024
41#define INVALID_ALIAS_HASH ((uint32_t)-1)
42
43#ifdef HAVE_ALIAS_SUPPORT
44class MSOpenTable;
45class MSDatabase;
46class CSHTTPOutputStream;
47class MSMetaDataTable;
48
49#define ACTIVE_ALIAS_INDEX "ms_blob_alias.idx"
50
51#define NUM_RECORDS_PER_BUCKET 254 // 254 = bucket size of 4 K
52#define BUCKET_LIST_SIZE 1024
53
54typedef struct MSAliasHead {
55 CSDiskValue4 ah_magic_4; /* Table magic number. */
56 CSDiskValue2 ah_version_2; /* The header version. */
57 CSDiskValue2 ah_head_size_2; /* The size of the header. */
58 CSDiskValue8 ah_file_size_8; /* The size of the file. */
59
60 CSDiskValue8 ah_free_list_8; /* The offset of the first bucket in the free list. */
61
62 CSDiskValue2 ah_num_buckets_2; /* The number of bucket chains in the index. (BUCKET_LIST_SIZE when created)*/
63 CSDiskValue4 ah_bucket_size_4; /* The size of each bucket. (NUM_RECORDS_PER_BUCKET when created)*/
64
65} MSAliasHeadRec, *MSAliasHeadPtr;
66
67/*
68 * When a record is freed ba_repo_id_4 is set to zero
69*/
70typedef struct MSDiskAliasRec {
71 CSDiskValue4 ar_repo_id_4; /* File ID. Not zero when allocated. */
72 CSDiskValue8 ar_offset_8; /* Offset into the file of the BLOB. */
73 CSDiskValue4 ar_hash_4; /* The hash value of the alias string. (Is assumed to be at the end of the structure.*/
74} MSDiskAliasRec, *MSDiskAliasPtr;
75
76typedef struct MSADiskBucketHead {
77 CSDiskValue8 ab_prev_bucket_8; /* The file offset of the previouse bucket in the chain. */
78 CSDiskValue8 ab_next_bucket_8; /* The file offset of the next bucket in the chain. */
79 CSDiskValue4 ab_num_recs_4; /* The number of used record in the bucket. */
80 CSDiskValue4 ab_eor_rec_4; /* (End Of Records) The position of the first free record after all the records in the bucket. */
81} MSADiskBucketHeadRec, *MSADiskBucketHeadPtr;
82
83typedef struct MSADiskBucket {
84 MSADiskBucketHeadRec ab_heaher;
85 MSDiskAliasRec ab_records[NUM_RECORDS_PER_BUCKET]; /* The start of the records in the bucket. */
86} MSADiskBucketRec, *MSADiskBucketPtr;
87
88/*
89 * MSABucketInfo stores bucket information in RAM.
90 */
91class MSABucketInfo: public CSOrderKey {
92public:
93
94 MSABucketInfo(uint64_t offset, uint32_t num, uint32_t end_of_records):
95 bi_bucket_offset(offset),
96 bi_records_offset(offset + offsetof(MSADiskBucketRec, ab_records)),
97 bi_num_recs(num),
98 bi_end_of_records(end_of_records),
99 bi_NextLink(NULL),
100 bi_PrevLink(NULL)
101 {}
102
103 uint64_t bi_bucket_offset; /* The file offset of the bucket. */
104
105 uint64_t bi_records_offset; /* The file offset of the first record in the bucket. */
106
107 // Required method for item in a CSLinkedList.
108 virtual MSABucketInfo *getNextLink() { return bi_NextLink; }
109 virtual MSABucketInfo *getPrevLink() { return bi_PrevLink; }
110 virtual void setNextLink(CSObject *link) { bi_NextLink = (MSABucketInfo*)link; }
111 virtual void setPrevLink(CSObject *link) { bi_PrevLink = (MSABucketInfo*)link; }
112
113 virtual CSObject *getKey() { return this;}
114 virtual int compareKey(CSOrderKey *x) {
115 MSABucketInfo *key = (MSABucketInfo *) x;
116
117 if (bi_bucket_offset != key->bi_bucket_offset)
118 return 0;
119
120 return (bi_bucket_offset < key->bi_bucket_offset)? -1: 1;
121 }
122
123 static MSABucketInfo *newMSABucketInfo(uint64_t offset, uint32_t num = 0, uint32_t last = 0);
124
125 uint32_t getSize() { return bi_num_recs;}
126 uint32_t getEndOfRecords() { return bi_end_of_records;}
127 void recAdded(CSFile *iFile, uint32_t idx);
128 void recRemoved(CSFile *iFile, uint32_t idx, MSDiskAliasRec bucket[]);
129
130private:
131 // (bi_end_of_records -1) is the index of the last valid record in the bucket.
132 // free records can actually appear any where in the bucket unless it has
133 // just been compressed.
134 uint32_t bi_num_recs; /* The number of records in the bucket. */
135 uint32_t bi_end_of_records; /* The index of the start of the free records in the bucket. */
136
137 MSABucketInfo *bi_NextLink;
138 MSABucketInfo *bi_PrevLink;
139};
140
141class MSABucketLinkedList: public CSLinkedList {
142public:
143
144 /* Value is returned referenced. */
145 MSABucketInfo *removeBack() { return (MSABucketInfo*) CSLinkedList::removeBack();}
146
147 /* Value is returned NOT referenced. */
148 MSABucketInfo *getBack(){ return (MSABucketInfo*) CSLinkedList::getBack();}
149
150 /* Value is returned NOT referenced. */
151 MSABucketInfo *getFront(){ return (MSABucketInfo*) CSLinkedList::getFront();}
152
153 /* Value is returned referenced. */
154 MSABucketInfo *removeFront(){ return (MSABucketInfo*) CSLinkedList::removeFront();}
155};
156
157typedef struct MSAliasRec {
158 uint32_t repo_id;
159 uint64_t repo_offset;
160 uint32_t alias_hash;
161} MSAliasRec, *MSAliasPtr;
162
163class MSAliasFile : public CSPooled, public CSRefObject {
164public:
165 class MSAliasFileShare *ba_share;
166 bool ba_isFileInUse;
167 MSAliasFile *ba_nextFile; /* Next file available in the pool */
168
169 MSAliasFile(MSAliasFileShare *share);
170 virtual ~MSAliasFile();
171
172 // Required method for CSPool item.
173 virtual void returnToPool();
174
175
176 // Required method for item in a CSLinkedList.
177 virtual CSObject *getNextLink() { return ba_NextLink; }
178 virtual CSObject *getPrevLink() { return ba_PrevLink; }
179 virtual void setNextLink(CSObject *link) { ba_NextLink = link; }
180 virtual void setPrevLink(CSObject *link) { ba_PrevLink = link; }
181
182 // Index file operations.
183 MSDiskAliasPtr findRec(uint32_t hash);
184 MSDiskAliasPtr nextRec();
185 void addRec(MSDiskAliasPtr rec);
186 void deleteCurrentRec();
187 void updateCurrentRec(MSDiskAliasPtr rec);
188 bool findRec(MSDiskAliasPtr rec);
189
190 /* When a load is inprogress locks are not required and writes are batched. */
191 void startLoad();
192 void finishLoad();
193
194
195private:
196 bool nextBucket(bool with_space);
197
198 bool scanBucket()
199 {
200 while (iCurrentRec) {
201 iCurrentRec--;
202 if ( CS_EQ_DISK_4(iDiskHash_4, iBucket[iCurrentRec].ar_hash_4)
203 && !CS_IS_NULL_DISK_4(iBucket[iCurrentRec].ar_repo_id_4))
204 return true;
205 }
206 return false;
207 }
208
209 CSFile *iFile; // The index file.
210
211 bool iLoading;
212 MSADiskBucketRec *iBucketCache; // The bucket cache is used during index loading in single thread mode.
213 MSDiskAliasRec iBucket[NUM_RECORDS_PER_BUCKET];// The current bucket loaded from disk.
214 MSABucketLinkedList *iBucketChain; // The bucket list for the current hash value.
215 MSABucketInfo *iStartBucket; // The file offset of the bucket the search started at.
216 MSABucketInfo *iCurrentBucket;// The currnet bucket, NULL if no bucket is loaded.
217
218 CSDiskValue4 iDiskHash_4; // The current hash value we are looking for in disk byte order.
219 uint32_t iCurrentRec; // The current record position in the current bucket.
220
221 CSObject *ba_NextLink;
222 CSObject *ba_PrevLink;
223
224};
225
226//===========================================
227class MSAliasFileShare: public CSObject {
228public:
229 MSAliasFileShare(CSPath *path):
230 msa_filePath(path),
231 msa_fileSize(0),
232 msa_pool(NULL),
233 msa_closing(false)
234 {
235 bool isdir = false;
236 if (path->exists(&isdir))
237 msa_fileSize = path->getSize();
238 }
239
240 ~MSAliasFileShare()
241 {
242 msa_poolFiles.clear();
243 if (msa_filePath)
244 msa_filePath->release();
245
246 for (uint32_t i =0; i < BUCKET_LIST_SIZE; i++)
247 msa_buckets[i].clear();
248
249 msa_empty_buckets.clear();
250 }
251
252 void close() { msa_poolFiles.clear();}
253
254 MSABucketLinkedList *getBucketChain(uint32_t hash) { return msa_buckets + (hash % BUCKET_LIST_SIZE); }
255 MSAliasFile *getPoolFile();
256
257 CSLinkedList msa_poolFiles; /* A list of all files in this pool */
258 uint64_t msa_fileSize;
259 CSPath *msa_filePath;
260 CSLock msa_writeLock;
261 MSAliasFile *msa_pool; /* A list of files currently not in use. */
262
263 CSLock msa_poolLock;
264 bool msa_closing;
265 MSABucketLinkedList msa_empty_buckets; /* A list of unused buckets. */
266
267 MSABucketLinkedList msa_buckets[BUCKET_LIST_SIZE]; /* An array of bucket chains. */
268};
269
270//===========================================
271class MSSysMeta : public CSRefObject, public CSPooled {
272public:
273 class MSAlias *md_myMSAlias;
274 bool md_isFileInUse;
275 MSSysMeta *md_nextFile; /* Next file available in the pool */
276
277 MSSysMeta(MSAlias *msa);
278 virtual ~MSSysMeta();
279
280 virtual void returnToPool();
281
282 virtual CSObject *getNextLink() { return md_NextLink; }
283 virtual CSObject *getPrevLink() { return md_PrevLink; }
284 virtual void setNextLink(CSObject *link) { md_NextLink = link; }
285 virtual void setPrevLink(CSObject *link) { md_PrevLink = link; }
286
287 bool matchAlias(uint32_t repo_id, uint64_t repo_offset, const char *alias);
288
289private:
290 MSMetaDataTable *mtab;
291 CSObject *md_NextLink;
292 CSObject *md_PrevLink;
293
294};
295
296//===========================================
297class MSAlias : public CSSharedRefObject {
298public:
299
300 MSAlias(MSDatabase *db_noref);
301 ~MSAlias();
302
303 void ma_open(const char *file_name = ACTIVE_ALIAS_INDEX);
304 void ma_close();
305 void ma_delete() {iDelete = true;}
306
307 uint32_t addAlias(uint32_t repo_id, uint64_t repo_offset, const char *alias);
308private:
309 void addAlias(MSAliasFile *af, MSAliasRec *rec);
310public:
311 void deleteAlias(MSDiskAliasPtr diskRec);
312 void deleteAlias(uint32_t repo_id, uint64_t repo_offset, uint32_t alias_hash);
313 void resetAlias(uint32_t old_repo_id, uint64_t old_repo_offset, uint32_t alias_hash, uint32_t new_repo_id, uint64_t new_repo_offset);
314
315 bool findBlobByAlias(const char *alias, bool *referenced, uint32_t *repo_id = NULL, uint64_t *repo_offset = NULL);
316 bool blobAliasExists(uint32_t repo_id, uint64_t repo_offset, uint32_t alias_hash);
317private:
318 bool findBlobByAlias(MSAliasFile *af, const char *alias, bool *referenced, uint32_t *repo_id = NULL, uint64_t *repo_offset = NULL);
319public:
320 static uint32_t hashAlias(const char *ptr);
321
322 void MSAliasBuild();
323
324 friend class MSAliasFile;
325 friend class MSSysMeta;
326
327private:
328 MSDatabase *iDatabase_br; // This is a back reference so this reference is not counted.
329 CSPath *iFilePath;
330
331 bool iClosing;
332 bool iDelete; // true when the alias index file should be deleted.
333
334 MSAliasFileShare *iFileShare; // File information shared between all files in the pool.
335
336 CSLock iSysTablePoolLock;
337 MSSysMeta *iSysTablePool; /* A list of files currently not in use. */
338 CSLinkedList iPoolSysTables; /* A list of all files in this pool */
339
340 MSAliasFile *getAliasFile() { return iFileShare->getPoolFile();}
341 void buildAliasIndex();
342 void MSAliasCompress(CSFile *fa, CSSortedList *freeList, MSABucketLinkedList *bucketChain);
343 void MSAliasLoad();
344 bool hasBlobAlias(uint32_t repo_id, uint64_t repo_offset, const char *alias, bool *referenced);
345};
346
347
348#endif //HAVE_ALIAS_SUPPORT
349
350#endif // __ALIAS_MS_H__
3510
=== removed file 'plugin/pbms/src/api_ms.cc'
--- plugin/pbms/src/api_ms.cc 2010-12-18 04:43:40 +0000
+++ plugin/pbms/src/api_ms.cc 1970-01-01 00:00:00 +0000
@@ -1,270 +0,0 @@
1#ifdef NOT_USED_IN_ANY_THING
2
3/* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
4 *
5 * PrimeBase Media Stream for MySQL
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 *
21 * Barry Leslie
22 *
23 * 2007-11-25
24 *
25 * H&G2JCtL
26 *
27 */
28
29#include "cslib/CSConfig.h"
30#include "cslib/CSGlobal.h"
31#include "cslib/CSLog.h"
32#include "cslib/CSStrUtil.h"
33#include "cslib/CSHTTPStream.h"
34#include "cslib/CSStream.h"
35
36#include "repository_ms.h"
37#include "open_table_ms.h"
38#include "mysql_ms.h"
39
40//-----------------------------------------------------------------------------------------------
41void PBMSGetError(void *v_bs_thread, PBMSResultPtr result)
42{
43 CSThread *ms_thread = (CSThread*)v_bs_thread;
44
45 ASSERT(ms_thread);
46 memset(result, 0, sizeof(PBMSResultRec));
47
48 result->mr_code = ms_thread->myException.getErrorCode();
49 cs_strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, ms_thread->myException.getMessage());
50}
51
52//-----------------------------------------------------------------------------------------------
53void *PBMSInitBlobStreamingThread(char *thread_name, PBMSResultPtr result)
54{
55 CSThread *ms_thread = new CSThread( NULL);
56
57 if (!ms_thread) {
58 memset(result, 0, sizeof(PBMSResultRec));
59 result->mr_code = ENOMEM;
60 cs_strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "CSThread::newThread() failed.");
61 return NULL;
62 }
63
64 ms_thread->pbms_api_owner = true;
65 if (!CSThread::attach(ms_thread)) {
66 memset(result, 0, sizeof(PBMSResultRec));
67 result->mr_code = ms_thread->myException.getErrorCode();
68 cs_strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, ms_thread->myException.getMessage());
69 ms_thread->release();
70 ms_thread = NULL;
71 } else
72 ms_thread->threadName = CSString::newString(thread_name);
73
74 return ms_thread;
75}
76
77
78//-----------------------------------------------------------------------------------------------
79void PBMSDeinitBlobStreamingThread(void *v_bs_thread)
80{
81 CSThread *ms_thread = (CSThread*)v_bs_thread;
82
83 ASSERT(ms_thread);
84
85 CSThread::detach(ms_thread);
86 // ms_thread->release(); Don't do this. Ownership of the thread is passed to the attach call so the thread is released when it is detached.
87}
88
89//-----------------------------------------------------------------------------------------------
90bool PBMSCreateBlob(PBMSBlobIDPtr blob_id, char *database_name, uint64_t size)
91{
92 MSOpenTable *otab = NULL;
93 CSString *iTableURI = NULL;
94 CSString *CSContenttype = NULL;
95 bool done_ok = true;
96
97 enter_();
98
99 try_(a) {
100 otab = MSTableList::getOpenTableForDB(MSDatabase::getDatabaseID(database_name, false));
101
102 otab->createBlob(blob_id, size, NULL, 0);
103 }
104
105 catch_(a) {
106 done_ok = false;
107 }
108 cont_(a);
109
110 exit:
111 if (otab)
112 otab->returnToPool();
113
114 if (CSContenttype)
115 CSContenttype->release();
116
117 if (iTableURI)
118 iTableURI->release();
119
120 return_(done_ok);
121}
122
123//-----------------------------------------------------------------------------------------------
124bool PBMSWriteBlob(PBMSBlobIDPtr blob_id, char *data, size_t size, size_t offset)
125{
126 MSOpenTable *otab;
127 MSRepoFile *repo_file;
128 bool done_ok = true;
129
130 enter_();
131
132 try_(a) {
133 if (!(otab = MSTableList::getOpenTableForDB(blob_id->bi_db_id))) {
134 char buffer[CS_EXC_MESSAGE_SIZE];
135 char id_str[12];
136
137 snprintf(id_str, 12, "%"PRIu32"", blob_id->bi_db_id);
138
139 cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown database id # ");
140 cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, id_str);
141 CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, buffer);
142 }
143 frompool_(otab);
144 repo_file = otab->getDB()->getRepoFileFromPool( blob_id->bi_tab_id, false);
145 frompool_(repo_file);
146 // It is assumed that at this point the blob is a repository blob and so the
147 // blob_id->bi_blob_id is actually the repository blob offset.
148 repo_file->writeBlobChunk(blob_id, blob_id->bi_blob_id, offset, size, data);
149 backtopool_(repo_file);
150 backtopool_(otab);
151
152 }
153 catch_(a) {
154 done_ok = false;
155 }
156
157 cont_(a);
158
159 return_(done_ok);
160}
161
162//-----------------------------------------------------------------------------------------------
163bool PBMSReadBlob(PBMSBlobIDPtr blob_id, char *buffer, size_t *size, size_t offset)
164{
165 MSOpenTable *otab;
166 MSRepoFile *repo_file;
167 bool done_ok = true, is_repository_blob;
168
169 enter_();
170
171 is_repository_blob = (blob_id->bi_blob_type == MS_URL_TYPE_REPO);
172 try_(a) {
173 if (!(otab = MSTableList::getOpenTableByID(blob_id->bi_db_id, blob_id->bi_tab_id))) {
174 char buffer[CS_EXC_MESSAGE_SIZE];
175 char id_str[12];
176
177
178 cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown database: ID # ");
179 snprintf(id_str, 12, "%"PRIu32"", blob_id->bi_db_id);
180 cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, id_str);
181 cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, " or table: ID #");
182 snprintf(id_str, 12, "%"PRIu32"", blob_id->bi_tab_id);
183 cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, id_str);
184 CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, buffer);
185 }
186 uint32_t repo_id;
187 uint64_t rep_offset;
188
189
190 frompool_(otab);
191 if (is_repository_blob) {
192 repo_id = blob_id->bi_tab_id;
193 rep_offset = blob_id->bi_blob_id;
194 } else {
195 uint64_t blob_size;
196 uint16_t header_size;
197 otab->getDBTable()->readBlobHandle(otab, blob_id->bi_blob_id, &(blob_id->bi_auth_code), &repo_id, &rep_offset, &blob_size, &header_size, true);
198 }
199
200 repo_file = otab->getDB()->getRepoFileFromPool( repo_id, false);
201 frompool_(repo_file);
202 *size = repo_file->readBlobChunk(blob_id, rep_offset, offset, *size, buffer);
203 backtopool_(repo_file);
204 backtopool_(otab);
205
206 }
207 catch_(a) {
208 done_ok = false;
209 }
210
211 cont_(a);
212
213 return_(done_ok);
214}
215
216//-----------------------------------------------------------------------------------------------
217bool PBMSIDToURL(PBMSBlobIDPtr blob_id, PBMSBlobURLPtr url)
218{
219 MSBlobURL ms_blob;
220
221 ms_blob.bu_db_id = blob_id->bi_db_id;
222 ms_blob.bu_blob_id = blob_id->bi_blob_id;
223 ms_blob.bu_blob_ref_id = blob_id->bi_blob_ref_id;
224 ms_blob.bu_tab_id = blob_id->bi_tab_id;
225 ms_blob.bu_auth_code = blob_id->bi_auth_code;
226 ms_blob.bu_type = blob_id->bi_blob_type;
227 ms_blob.bu_blob_size = blob_id->bi_blob_size;
228 ms_blob.bu_server_id = ms_my_get_server_id();
229
230 PBMSBlobURLTools::buildBlobURL(&ms_blob, url);
231 return true;
232}
233
234//-----------------------------------------------------------------------------------------------
235bool PBMSURLToID(char *url, PBMSBlobIDPtr blob_id)
236{
237 MSBlobURL ms_blob;
238 bool done_ok = true;
239 enter_();
240
241 try_(a) {
242
243 if (!PBMSBlobURLTools::couldBeURL(url, &ms_blob)){
244 char buffer[CS_EXC_MESSAGE_SIZE];
245
246 cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Incorrect URL: ");
247 cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, url);
248 CSException::throwException(CS_CONTEXT, MS_ERR_INCORRECT_URL, buffer);
249 }
250
251 blob_id->bi_db_id = ms_blob.bu_db_id;
252 blob_id->bi_blob_id = ms_blob.bu_blob_id;
253 blob_id->bi_blob_ref_id = ms_blob.bu_blob_ref_id;
254 blob_id->bi_tab_id = ms_blob.bu_tab_id;
255 blob_id->bi_auth_code = ms_blob.bu_auth_code;
256 blob_id->bi_blob_type = ms_blob.bu_type;
257 blob_id->bi_blob_size = ms_blob.bu_blob_size;
258
259 }
260 catch_(a) {
261 done_ok = false;
262 }
263
264 cont_(a);
265
266 return_(done_ok);
267}
268
269
270#endif // NOT_USED_IN_ANY_THING
2710
=== removed file 'plugin/pbms/src/backup_ms.cc'
--- plugin/pbms/src/backup_ms.cc 2011-04-20 22:18:30 +0000
+++ plugin/pbms/src/backup_ms.cc 1970-01-01 00:00:00 +0000
@@ -1,733 +0,0 @@
1/* Copyright (C) 2009 PrimeBase Technologies GmbH, Germany
2 *
3 * PrimeBase Media Stream for MySQL
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Barry Leslie
20 *
21 * 2009-05-29
22 *
23 * H&G2JCtL
24 *
25 * Repository backup.
26 *
27 * The backup is done by creating a new database with the same name and ID in the
28 * backup location. Then the pbms_dump table in the source database is initialized
29 * for a sequential scan for backup. This has the effect of locking all current repository
30 * files. Then the equvalent of 'insert into dst_db.pbms_dump (select * from src_db.pbms_dump);'
31 * is performed.
32 *
33 */
34
35#ifdef DRIZZLED
36#include <config.h>
37#include <drizzled/common.h>
38#include <drizzled/session.h>
39#include <drizzled/table.h>
40#include <drizzled/message/table.pb.h>
41#include <drizzled/charset.h>
42#include <drizzled/table_proto.h>
43#include <drizzled/field.h>
44#include <drizzled/field/varstring.h>
45#endif
46
47#include "cslib/CSConfig.h"
48
49#include <sys/types.h>
50#include <inttypes.h>
51
52#include "cslib/CSGlobal.h"
53#include "cslib/CSStrUtil.h"
54#include "cslib/CSStorage.h"
55
56#include "defs_ms.h"
57#include "system_table_ms.h"
58#include "open_table_ms.h"
59#include "table_ms.h"
60#include "database_ms.h"
61#include "repository_ms.h"
62#include "backup_ms.h"
63#include "transaction_ms.h"
64#include "systab_variable_ms.h"
65#include "systab_backup_ms.h"
66
67uint32_t MSBackupInfo::gMaxInfoRef;
68CSSyncSparseArray *MSBackupInfo::gBackupInfo;
69
70//==========================================
71MSBackupInfo::MSBackupInfo( uint32_t id,
72 const char *name,
73 uint32_t db_id_arg,
74 time_t start,
75 time_t end,
76 bool _isDump,
77 const char *location,
78 uint32_t cloudRef_arg,
79 uint32_t cloudBackupNo_arg ):
80 backupRefId(id),
81 db_name(NULL),
82 db_id(db_id_arg),
83 startTime(start),
84 completionTime(end),
85 dump(_isDump),
86 isRunning(false),
87 backupLocation(NULL),
88 cloudRef(cloudRef_arg),
89 cloudBackupNo(cloudBackupNo_arg)
90{
91 db_name = CSString::newString(name);
92 if (location && *location)
93 backupLocation = CSString::newString(location);
94}
95
96//-------------------------------
97MSBackupInfo::~MSBackupInfo()
98{
99 if (db_name)
100 db_name->release();
101
102 if (backupLocation)
103 backupLocation->release();
104}
105
106//-------------------------------
107void MSBackupInfo::startBackup(MSDatabase *pbms_db)
108{
109 MSDatabase *src_db;
110
111 enter_();
112 push_(pbms_db);
113
114 src_db = MSDatabase::getDatabase(db_id);
115 push_(src_db);
116
117 startTime = time(NULL);
118
119 src_db->startBackup(RETAIN(this));
120 release_(src_db);
121
122 isRunning = true;
123
124 pop_(pbms_db);
125 MSBackupTable::saveTable(pbms_db);
126 exit_();
127}
128
129//-------------------------------
130class StartDumpCleanUp : public CSRefObject {
131 bool do_cleanup;
132 uint32_t ref_id;
133
134 public:
135
136 StartDumpCleanUp(): CSRefObject(),
137 do_cleanup(false){}
138
139 ~StartDumpCleanUp()
140 {
141 if (do_cleanup) {
142 MSBackupInfo::gBackupInfo->remove(ref_id);
143 }
144 }
145
146 void setCleanUp(uint32_t id)
147 {
148 ref_id = id;
149 do_cleanup = true;
150 }
151
152 void cancelCleanUp()
153 {
154 do_cleanup = false;
155 }
156
157};
158
159MSBackupInfo *MSBackupInfo::startDump(MSDatabase *db, uint32_t cloud_ref, uint32_t backup_no)
160{
161 MSBackupInfo *info;
162 uint32_t ref_id;
163 StartDumpCleanUp *cleanup;
164
165 enter_();
166 push_(db);
167 lock_(gBackupInfo);
168
169 ref_id = gMaxInfoRef++;
170 new_(info, MSBackupInfo(ref_id, db->myDatabaseName->getCString(), db->myDatabaseID, time(NULL), 0, true, NULL, cloud_ref, backup_no));
171 push_(info);
172
173 gBackupInfo->set(ref_id, RETAIN(info));
174
175 info->isRunning = true;
176
177 pop_(info);
178 unlock_(gBackupInfo);
179 push_(info);
180
181 // Create a cleanup object to handle cleanup
182 // after a possible exception.
183 new_(cleanup, StartDumpCleanUp());
184 push_(cleanup);
185 cleanup->setCleanUp(ref_id);
186
187 MSBackupTable::saveTable(RETAIN(db));
188
189 cleanup->cancelCleanUp();
190 release_(cleanup);
191
192 pop_(info);
193 release_(db);
194
195 return_(info);
196}
197//-------------------------------
198void MSBackupInfo::backupCompleted(MSDatabase *db)
199{
200 completionTime = time(NULL);
201 isRunning = false;
202 MSBackupTable::saveTable(db);
203}
204
205//-------------------------------
206void MSBackupInfo::backupTerminated(MSDatabase *db)
207{
208 enter_();
209 push_(db);
210 lock_(gBackupInfo);
211
212 gBackupInfo->remove(backupRefId);
213 unlock_(gBackupInfo);
214
215 pop_(db);
216 MSBackupTable::saveTable(db);
217 exit_();
218}
219
220//==========================================
221MSBackup::MSBackup():
222CSDaemon(NULL),
223bu_info(NULL),
224bu_BackupList(NULL),
225bu_Compactor(NULL),
226bu_BackupRunning(false),
227bu_State(BU_COMPLETED),
228bu_SourceDatabase(NULL),
229bu_Database(NULL),
230bu_dst_dump(NULL),
231bu_src_dump(NULL),
232bu_size(0),
233bu_completed(0),
234bu_ID(0),
235bu_start_time(0),
236bu_TransactionManagerSuspended(false)
237{
238}
239
240MSBackup *MSBackup::newMSBackup(MSBackupInfo *info)
241{
242 MSBackup *bu;
243 enter_();
244
245 push_(info);
246
247 new_(bu, MSBackup());
248 push_(bu);
249 bu->bu_Database = MSDatabase::getBackupDatabase(RETAIN(info->backupLocation), RETAIN(info->db_name), info->db_id, true);
250 pop_(bu);
251
252 bu->bu_info = info;
253 pop_(info);
254
255 return_(bu);
256}
257
258//-------------------------------
259class StartBackupCleanUp : public CSRefObject {
260 bool do_cleanup;
261 MSBackup *backup;
262
263 public:
264
265 StartBackupCleanUp(): CSRefObject(),
266 do_cleanup(false){}
267
268 ~StartBackupCleanUp()
269 {
270 if (do_cleanup) {
271 backup->completeBackup();
272 }
273 }
274
275 void setCleanUp(MSBackup *bup)
276 {
277 backup = bup;
278 do_cleanup = true;
279 }
280
281 void cancelCleanUp()
282 {
283 do_cleanup = false;
284 }
285
286};
287
288void MSBackup::startBackup(MSDatabase *src_db)
289{
290 CSSyncVector *repo_list;
291 bool compacting = false;
292 MSRepository *repo;
293 StartBackupCleanUp *cleanup;
294 enter_();
295
296 // Create a cleanup object to handle cleanup
297 // after a possible exception.
298 new_(cleanup, StartBackupCleanUp());
299 push_(cleanup);
300 cleanup->setCleanUp(this);
301
302 bu_SourceDatabase = src_db;
303 repo_list = bu_SourceDatabase->getRepositoryList();
304 // Suspend the compactor before locking the list.
305 bu_Compactor = bu_SourceDatabase->getCompactorThread();
306 if (bu_Compactor) {
307 bu_Compactor->retain();
308 bu_Compactor->suspend();
309 }
310
311 // Build the list of repositories to be backed up.
312 lock_(repo_list);
313
314 new_(bu_BackupList, CSVector(repo_list->size()));
315 for (uint32_t i = 0; i<repo_list->size(); i++) {
316 if ((repo = (MSRepository *) repo_list->get(i))) {
317 if (!repo->isRemovingFP && !repo->mustBeDeleted) {
318 bu_BackupList->add(RETAIN(repo));
319 if (repo->initBackup() == REPO_COMPACTING)
320 compacting = true;
321
322 if (!repo->myRepoHeadSize) {
323 /* The file has not yet been opened, so the
324 * garbage count will not be known!
325 */
326 MSRepoFile *repo_file;
327
328 //repo->retain();
329 //unlock_(myRepostoryList);
330 //push_(repo);
331 repo_file = repo->openRepoFile();
332 repo_file->release();
333 //release_(repo);
334 //lock_(myRepostoryList);
335 //goto retry;
336 }
337
338 bu_size += repo->myRepoFileSize;
339
340 }
341 }
342 }
343
344 // Copy the table list to the backup database:
345 uint32_t next_tab = 0;
346 MSTable *tab;
347 while ((tab = bu_SourceDatabase->getNextTable(&next_tab))) {
348 push_(tab);
349 bu_Database->addTable(tab->myTableID, tab->myTableName->getCString(), 0, false);
350 release_(tab);
351 }
352 unlock_(repo_list);
353
354 // Copy over any physical PBMS system tables.
355 PBMSSystemTables::transferSystemTables(RETAIN(bu_Database), RETAIN(bu_SourceDatabase));
356
357 // Load the system tables into the backup database. This will
358 // initialize the database for cloud storage if required.
359 PBMSSystemTables::loadSystemTables(RETAIN(bu_Database));
360
361 // Set the cloud backup info.
362 bu_Database->myBlobCloud->cl_setBackupInfo(RETAIN(bu_info));
363
364
365 // Set the backup number in the pbms_variable tabe. (This is a hidden value.)
366 // This value is used in case a drag and drop restore was done. When a data base is
367 // first loaded this value is checked and if it is not zero then the backup record
368 // will be read and any used to recover any BLOBs.
369 //
370 char value[20];
371 snprintf(value, 20, "%"PRIu32"", bu_info->getBackupRefId());
372 MSVariableTable::setVariable(RETAIN(bu_Database), BACKUP_NUMBER_VAR, value);
373
374 // Once the repositories are locked the compactor can be restarted
375 // unless it is in the process of compacting a repository that is
376 // being backed up.
377 if (bu_Compactor && !compacting) {
378 bu_Compactor->resume();
379 bu_Compactor->release();
380 bu_Compactor = NULL;
381 }
382
383 // Suspend the transaction writer while the backup is running.
384 MSTransactionManager::suspend(true);
385 bu_TransactionManagerSuspended = true;
386
387 // Start the backup daemon thread.
388 bu_ID = bu_start_time = time(NULL);
389 start();
390
391 cleanup->cancelCleanUp();
392 release_(cleanup);
393
394 exit_();
395}
396
397void MSBackup::completeBackup()
398{
399 if (bu_TransactionManagerSuspended) {
400 MSTransactionManager::resume();
401 bu_TransactionManagerSuspended = false;
402 }
403
404 if (bu_BackupList) {
405 MSRepository *repo;
406
407 while (bu_BackupList->size()) {
408 repo = (MSRepository *) bu_BackupList->take(0);
409 if (repo) {
410 repo->backupCompleted();
411 repo->release();
412 }
413 }
414 bu_BackupList->release();
415 bu_BackupList = NULL;
416 }
417
418 if (bu_Compactor) {
419 bu_Compactor->resume();
420 bu_Compactor->release();
421 bu_Compactor = NULL;
422 }
423
424 if (bu_Database) {
425 if (bu_State == BU_COMPLETED)
426 bu_Database->releaseBackupDatabase();
427 else
428 MSDatabase::dropDatabase(bu_Database);
429
430 bu_Database = NULL;
431 }
432
433 if (bu_SourceDatabase){
434 if (bu_State == BU_COMPLETED)
435 bu_info->backupCompleted(bu_SourceDatabase);
436 else
437 bu_info->backupTerminated(bu_SourceDatabase);
438
439 bu_SourceDatabase = NULL;
440 bu_info->release();
441 bu_info = NULL;
442 }
443
444 bu_BackupRunning = false;
445}
446
447bool MSBackup::doWork()
448{
449 enter_();
450 try_(a) {
451 CSMutex *my_lock;
452 MSRepository *src_repo, *dst_repo;
453 MSRepoFile *src_file, *dst_file;
454 off64_t src_offset, prev_offset;
455 uint16_t head_size;
456 uint64_t blob_size, blob_data_size;
457 CSStringBuffer *head;
458 MSRepoPointersRec ptr;
459 uint32_t table_ref_count;
460 uint32_t blob_ref_count;
461 int ref_count;
462 size_t ref_size;
463 uint32_t auth_code;
464 uint32_t tab_id;
465 uint64_t blob_id;
466 MSOpenTable *otab;
467 uint32_t src_repo_id;
468 uint8_t status;
469 uint8_t blob_storage_type;
470 uint16_t tab_index;
471 uint32_t mod_time;
472 char *transferBuffer;
473 CloudKeyRec cloud_key;
474
475
476 bu_BackupRunning = true;
477 bu_State = BU_RUNNING;
478
479 /*
480 // For testing:
481 {
482 int blockit = 0;
483 myWaitTime = 5 * 1000; // Time in milli-seconds
484 while (blockit)
485 return_(true);
486 }
487 */
488
489 transferBuffer = (char*) cs_malloc(MS_BACKUP_BUFFER_SIZE);
490 push_ptr_(transferBuffer);
491
492 new_(head, CSStringBuffer(100));
493 push_(head);
494
495 src_repo = (MSRepository*)bu_BackupList->get(0);
496 while (src_repo && !myMustQuit) {
497 src_offset = 0;
498 src_file = src_repo->openRepoFile();
499 push_(src_file);
500
501 dst_repo = bu_Database->lockRepo(src_repo->myRepoFileSize - src_repo->myGarbageCount);
502 frompool_(dst_repo);
503 dst_file = dst_repo->openRepoFile();
504 push_(dst_file);
505
506 src_repo_id = src_repo->myRepoID;
507 src_offset = src_repo->myRepoHeadSize;
508 prev_offset = 0;
509 while (src_offset < src_repo->myRepoFileSize) {
510 retry_read:
511
512 bu_completed += src_offset - prev_offset;
513 prev_offset = src_offset;
514 suspended();
515
516 if (myMustQuit)
517 break;
518
519 // A lock is required here because references and dereferences to the
520 // BLOBs can result in the repository record being updated while
521 // it is being copied.
522 my_lock = &src_repo->myRepoLock[src_offset % CS_REPO_REC_LOCK_COUNT];
523 lock_(my_lock);
524 head->setLength(src_repo->myRepoBlobHeadSize);
525 if (src_file->read(head->getBuffer(0), src_offset, src_repo->myRepoBlobHeadSize, 0) < src_repo->myRepoBlobHeadSize) {
526 unlock_(my_lock);
527 break;
528 }
529
530 ptr.rp_chars = head->getBuffer(0);
531 ref_size = CS_GET_DISK_1(ptr.rp_head->rb_ref_size_1);
532 ref_count = CS_GET_DISK_2(ptr.rp_head->rb_ref_count_2);
533 head_size = CS_GET_DISK_2(ptr.rp_head->rb_head_size_2);
534 blob_size = CS_GET_DISK_6(ptr.rp_head->rb_blob_repo_size_6);
535 blob_data_size = CS_GET_DISK_6(ptr.rp_head->rb_blob_data_size_6);
536 auth_code = CS_GET_DISK_4(ptr.rp_head->rb_auth_code_4);
537 status = CS_GET_DISK_1(ptr.rp_head->rb_status_1);
538 mod_time = CS_GET_DISK_4(ptr.rp_head->rb_mod_time_4);
539
540 blob_storage_type = CS_GET_DISK_1(ptr.rp_head->rb_storage_type_1);
541 if (blob_storage_type == MS_CLOUD_STORAGE) {
542 MSRepoFile::getBlobKey(ptr.rp_head, &cloud_key);
543 }
544
545 // If the BLOB was modified after the start of the backup
546 // then set the mod time to the backup time to ensure that
547 // a backup for update will work correctly.
548 if (mod_time > bu_start_time)
549 CS_SET_DISK_4(ptr.rp_head->rb_mod_time_4, bu_start_time);
550
551 // If the BLOB was moved during the time of this backup then copy
552 // it to the backup location as a referenced BLOB.
553 if ((status == MS_BLOB_MOVED) && (bu_ID == (uint32_t) CS_GET_DISK_4(ptr.rp_head->rb_backup_id_4))) {
554 status = MS_BLOB_REFERENCED;
555 CS_SET_DISK_1(ptr.rp_head->rb_status_1, status);
556 }
557
558 // sanity check
559 if ((blob_data_size == 0) || ref_count <= 0 || ref_size == 0 ||
560 head_size < src_repo->myRepoBlobHeadSize + ref_count * ref_size ||
561 !VALID_BLOB_STATUS(status)) {
562 /* Can't be true. Assume this is garbage! */
563 src_offset++;
564 unlock_(my_lock);
565 continue;
566 }
567
568
569 if ((status == MS_BLOB_REFERENCED) || (status == MS_BLOB_MOVED)) {
570 head->setLength(head_size);
571 if (src_file->read(head->getBuffer(0) + src_repo->myRepoBlobHeadSize, src_offset + src_repo->myRepoBlobHeadSize, head_size - src_repo->myRepoBlobHeadSize, 0) != (head_size- src_repo->myRepoBlobHeadSize)) {
572 unlock_(my_lock);
573 break;
574 }
575
576 table_ref_count = 0;
577 blob_ref_count = 0;
578
579 // Loop through all the references removing temporary references
580 // and counting table and blob references.
581
582 ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
583 for (int count = 0; count < ref_count; count++) {
584 switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
585 case MS_BLOB_FREE_REF:
586 break;
587 case MS_BLOB_TABLE_REF:
588 // Unlike the compactor, table refs are not checked because
589 // they do not yet exist in the backup database.
590 table_ref_count++;
591 break;
592 case MS_BLOB_DELETE_REF:
593 // These are temporary references from the TempLog file.
594 // They are not copied to the backup.
595 CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
596 break;
597 default:
598 // Must be a BLOB reference
599
600 tab_index = CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2);
601 if (tab_index && (tab_index <= ref_count)) {
602 // Only committed references are backed up.
603 if (IS_COMMITTED(CS_GET_DISK_8(ptr.rp_blob_ref->er_blob_ref_id_8))) {
604 MSRepoTableRefPtr tab_ref;
605 tab_ref = (MSRepoTableRefPtr) (head->getBuffer(0) + src_repo->myRepoBlobHeadSize + (tab_index-1) * ref_size);
606 if (CS_GET_DISK_2(tab_ref->rr_type_2) == MS_BLOB_TABLE_REF)
607 blob_ref_count++;
608 } else {
609 CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
610 }
611
612 } else {
613 /* Can't be true. Assume this is garbage! */
614 src_offset++;
615 unlock_(my_lock);
616 goto retry_read;
617 }
618 break;
619 }
620 ptr.rp_chars += ref_size;
621 }
622
623
624 // If there are still blob references then the record needs to be backed up.
625 if (table_ref_count && blob_ref_count) {
626
627 off64_t dst_offset;
628
629 dst_offset = dst_repo->myRepoFileSize;
630
631 /* Write the header. */
632 dst_file->write(head->getBuffer(0), dst_offset, head_size);
633
634 /* Copy the BLOB over: */
635 if (blob_storage_type == MS_CLOUD_STORAGE) {
636 bu_Database->myBlobCloud->cl_backupBLOB(&cloud_key);
637 } else
638 CSFile::transfer(RETAIN(dst_file), dst_offset + head_size, RETAIN(src_file), src_offset + head_size, blob_size, transferBuffer, MS_BACKUP_BUFFER_SIZE);
639
640 /* Update the references: */
641 ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
642 for (int count = 0; count < ref_count; count++) {
643 switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
644 case MS_BLOB_FREE_REF:
645 case MS_BLOB_DELETE_REF:
646 break;
647 case MS_BLOB_TABLE_REF:
648 tab_id = CS_GET_DISK_4(ptr.rp_tab_ref->tr_table_id_4);
649 blob_id = CS_GET_DISK_6(ptr.rp_tab_ref->tr_blob_id_6);
650
651 if ((otab = MSTableList::getOpenTableByID(bu_Database->myDatabaseID, tab_id))) {
652 frompool_(otab);
653 otab->getDBTable()->setBlobHandle(otab, blob_id, dst_repo->myRepoID, dst_offset, blob_size, head_size, auth_code);
654//CSException::throwException(CS_CONTEXT, MS_ERR_NOT_IMPLEMENTED, "What if an error ocurred here!");
655
656 backtopool_(otab);
657 }
658 break;
659 default:
660 break;
661 }
662 ptr.rp_chars += ref_size;
663 }
664
665 dst_repo->myRepoFileSize += head_size + blob_size;
666 }
667 }
668 unlock_(my_lock);
669 src_offset += head_size + blob_size;
670 }
671 bu_completed += src_offset - prev_offset;
672
673 // close the destination repository and cleanup.
674 release_(dst_file);
675 backtopool_(dst_repo);
676 release_(src_file);
677
678 // release the source repository and get the next one in the list.
679 src_repo->backupCompleted();
680 bu_BackupList->remove(0);
681
682 src_repo = (MSRepository*)bu_BackupList->get(0);
683 }
684
685 release_(head);
686 release_(transferBuffer);
687 if (myMustQuit)
688 bu_State = BU_TERMINATED;
689 else
690 bu_State = BU_COMPLETED;
691
692 }
693
694 catch_(a) {
695 logException();
696 }
697
698 cont_(a);
699 completeBackup();
700 myMustQuit = true;
701 return_(true);
702}
703
704void *MSBackup::completeWork()
705{
706 if (bu_SourceDatabase || bu_BackupList || bu_Compactor || bu_info) {
707 // We shouldn't be here
708 CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "MSBackup::completeBackup() not called");
709 if (bu_SourceDatabase) {
710 bu_SourceDatabase->release();
711 bu_SourceDatabase = NULL;
712 }
713
714 if (bu_BackupList) {
715 bu_BackupList->release();
716 bu_BackupList = NULL;
717 }
718
719
720 if (bu_Compactor) {
721 bu_Compactor->release();
722 bu_Compactor = NULL;
723 }
724
725
726 if (bu_info) {
727 bu_info->release();
728 bu_info = NULL;
729 }
730
731 }
732 return NULL;
733}
7340
=== removed file 'plugin/pbms/src/backup_ms.h'
--- plugin/pbms/src/backup_ms.h 2011-03-14 05:40:28 +0000
+++ plugin/pbms/src/backup_ms.h 1970-01-01 00:00:00 +0000
@@ -1,186 +0,0 @@
1/* Copyright (C) 2009 PrimeBase Technologies GmbH, Germany
2 *
3 * PrimeBase Media Stream for MySQL
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Barry Leslie
20 *
21 * 2009-05-29
22 *
23 * H&G2JCtL
24 *
25 * Repository backup.
26 *
27 */
28
29#pragma once
30#ifndef _BACKUP_MS_H_
31#define _BACKUP_MS_H_
32
33#include <inttypes.h>
34
35class MSDatabase;
36
37class MSBackupInfo : public CSRefObject {
38 friend class StartDumpCleanUp;
39 friend class InsertRowCleanUp;
40
41 private:
42 static uint32_t gMaxInfoRef;
43 static CSSyncSparseArray *gBackupInfo;
44
45 friend class MSBackupTable;
46 friend class MSBackup;
47
48 private:
49 uint32_t backupRefId;
50 CSString *db_name;
51 uint32_t db_id;
52 time_t startTime;
53 time_t completionTime;
54 bool dump;
55 bool isRunning;
56 CSString *backupLocation;
57 uint32_t cloudRef;
58 uint32_t cloudBackupNo;
59
60public:
61
62 static void startUp()
63 {
64 new_(gBackupInfo, CSSyncSparseArray(5));
65 gMaxInfoRef = 0;
66 }
67
68 static void shutDown()
69 {
70 if (gBackupInfo) {
71 gBackupInfo->clear();
72 gBackupInfo->release();
73 gBackupInfo = NULL;
74 }
75 }
76
77
78 static MSBackupInfo *findBackupInfo(uint32_t in_backupRefId)
79 {
80 MSBackupInfo *info;
81 enter_();
82
83 lock_(gBackupInfo);
84
85 info = (MSBackupInfo *) gBackupInfo->get(in_backupRefId);
86 if (info)
87 info->retain();
88 unlock_(gBackupInfo);
89 return_(info);
90 }
91
92 static MSBackupInfo *getBackupInfo(uint32_t in_backupRefId)
93 {
94 MSBackupInfo *info = findBackupInfo(in_backupRefId);
95 if (!info) {
96 enter_();
97 char msg[80];
98 snprintf(msg, 80, "Backup info with reference ID %"PRIu32" not found", in_backupRefId);
99 CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, msg);
100 outer_();
101 }
102 return info;
103 }
104
105
106 MSBackupInfo(uint32_t id, const char *name, uint32_t db_id, time_t start, time_t end, bool isDump, const char *location, uint32_t cloudRef_arg, uint32_t cloudBackupNo_arg );
107 ~MSBackupInfo();
108
109 uint32_t getBackupRefId() { return backupRefId;}
110
111 const char *getName(){ return db_name->getCString(); }
112
113 uint32_t getDatabaseId() { return db_id;}
114
115 time_t getStart(){ return startTime;}
116
117 time_t getEnd(){ return completionTime;}
118
119 bool isDump(){ return dump;}
120
121 bool isBackupRunning(){ return isRunning;}
122
123 const char *getLocation() { return (backupLocation)?backupLocation->getCString():NULL; }
124
125 uint32_t getcloudRef(){ return cloudRef;}
126 void setcloudRef(uint32_t no){ cloudRef = no;}
127
128 uint32_t getcloudBackupNo(){ return cloudBackupNo;}
129 void setcloudBackupNo(uint32_t no){ cloudBackupNo = no;}
130
131 static MSBackupInfo *startDump(MSDatabase *db, uint32_t cloud_ref, uint32_t backup_no);
132
133 void startBackup(MSDatabase *pbms_db);
134 void backupCompleted(MSDatabase *db);
135 void backupTerminated(MSDatabase *db);
136};
137
138
139class MSDatabase;
140class MSOpenSystemTable;
141
142class MSBackup :public CSDaemon {
143
144public:
145
146 MSBackup();
147 ~MSBackup(){} // Do nothing here because 'self' will no longer be valid, use completeWork().
148
149 virtual bool doWork();
150
151 virtual void *completeWork();
152
153 void startBackup(MSDatabase *src_db);
154 uint64_t getBackupSize() { return bu_size;}
155 uint64_t getBackupCompletedSize() { return bu_completed;}
156 bool isRunning() { return bu_BackupRunning;}
157 int getStatus() { return (bu_BackupRunning)?0:bu_State;}
158 uint32_t backupID() { return bu_ID;}
159
160 static MSBackup* newMSBackup(MSBackupInfo *backup_info);
161
162 friend class StartBackupCleanUp;
163private:
164 void completeBackup();
165
166 MSBackupInfo *bu_info;
167
168 CSVector *bu_BackupList;
169 CSDaemon *bu_Compactor;
170 bool bu_BackupRunning;
171 enum {BU_RUNNING = -1, BU_COMPLETED = 0, BU_TERMINATED = 1} bu_State;
172
173 MSDatabase *bu_SourceDatabase; // The source database.
174 MSDatabase *bu_Database; // The destination database.
175 MSOpenSystemTable *bu_dst_dump; // The source database's pbms_dump.
176 MSOpenSystemTable *bu_src_dump; // The source database's pbms_dump.
177 uint64_t bu_size; // The total size of the data to be backed up.
178 uint64_t bu_completed; // The amount of data that has been backed up so far.
179
180 uint32_t bu_ID;
181 uint32_t bu_start_time;
182
183 bool bu_TransactionManagerSuspended;
184};
185
186#endif // _BACKUP_MS_H_
1870
=== added directory 'plugin/pbms/src/cloud'
=== renamed file 'plugin/pbms/src/cloud_ms.cc' => 'plugin/pbms/src/cloud/cloud_ms.cc'
--- plugin/pbms/src/cloud_ms.cc 2011-04-20 22:18:30 +0000
+++ plugin/pbms/src/cloud/cloud_ms.cc 2011-06-01 23:57:43 +0000
@@ -20,15 +20,13 @@
20 *20 *
21 */21 */
22 22
23#include "includes/pbms_config.h"
24
23#ifdef DRIZZLED25#ifdef DRIZZLED
24#include <config.h>
25#include <drizzled/common.h>
26#include <drizzled/session.h>
27#include <drizzled/table.h>26#include <drizzled/table.h>
28#include <drizzled/message/table.pb.h>27#include <drizzled/message/table.pb.h>
29#include <drizzled/charset.h>28#include <drizzled/charset.h>
30#include <drizzled/table_proto.h>29#include <drizzled/table_proto.h>
31#include <drizzled/session.h>
32#include <drizzled/field.h>30#include <drizzled/field.h>
33#endif31#endif
3432
@@ -49,14 +47,11 @@
49#include "cslib/CSEncode.h"47#include "cslib/CSEncode.h"
50#include "cslib/CSS3Protocol.h"48#include "cslib/CSS3Protocol.h"
5149
52#include "backup_ms.h"
53#include "cloud_ms.h"50#include "cloud_ms.h"
5451
55CSSyncSparseArray *MSCloudInfo::gCloudInfo;52#include "lib/pbmslib.h" // Needed for MS_CLOUD_STORAGE
56uint32_t MSCloudInfo::gMaxInfoRef;53#include "database/database_ms.h"
5754
58uint32_t CloudDB::gKeyIndex;
59CSMutex CloudDB::gCloudKeyLock;
6055
61//==============================56//==============================
62MSCloudInfo::MSCloudInfo(uint32_t id,57MSCloudInfo::MSCloudInfo(uint32_t id,
@@ -94,24 +89,50 @@
94}89}
9590
96//-------------------------------91//-------------------------------
92void MSCloudInfo::setServer(const char *server)
93{
94 s3Prot->s3_setServer(server);
95}
96
97//-------------------------------
97const char *MSCloudInfo::getBucket() 98const char *MSCloudInfo::getBucket()
98{ 99{
99 return bucket->getCString();100 return bucket->getCString();
100}101}
101102
102//-------------------------------103//-------------------------------
104void MSCloudInfo::setBucket(const char *bucket_arg)
105{
106 if (bucket)
107 bucket->release();
108 bucket = CSString::newString(bucket_arg);
109}
110
111//-------------------------------
103const char *MSCloudInfo::getPublicKey() 112const char *MSCloudInfo::getPublicKey()
104{ 113{
105 return s3Prot->s3_getPublicKey();114 return s3Prot->s3_getPublicKey();
106}115}
107116
108//-------------------------------117//-------------------------------
118void MSCloudInfo::setPublicKey(const char *key)
119{
120 s3Prot->s3_setPublicKey(key);
121}
122
123//-------------------------------
109const char *MSCloudInfo::getPrivateKey() 124const char *MSCloudInfo::getPrivateKey()
110{ 125{
111 return s3Prot->s3_getPrivateKey();126 return s3Prot->s3_getPrivateKey();
112}127}
113128
114//-------------------------------129//-------------------------------
130void MSCloudInfo::setPrivateKey(const char *key)
131{
132 s3Prot->s3_setPrivateKey(key);
133}
134
135//-------------------------------
115CSString *MSCloudInfo::getSignature(const char *key, const char *content_type, uint32_t *s3AuthorizationTime)136CSString *MSCloudInfo::getSignature(const char *key, const char *content_type, uint32_t *s3AuthorizationTime)
116{137{
117 return s3Prot->s3_getAuthorization(bucket->getCString(), key, content_type, s3AuthorizationTime);138 return s3Prot->s3_getAuthorization(bucket->getCString(), key, content_type, s3AuthorizationTime);
@@ -183,20 +204,18 @@
183}204}
184205
185//==============================206//==============================
186CloudDB::CloudDB(uint32_t db_id):207CloudDB::CloudDB(MSDatabase *db_backref):
187 dfltCloudRefId(0),208 dfltCloudRefId(0),
188 keep_alive(5 * 60),// default URL keep alive in seconds.209 keep_alive(5 * 60),// default URL keep alive in seconds.
189 blob_recovery_no(0),
190 blob_db_id(db_id),
191 isBackup(false),
192 backupInfo(NULL),
193 backupCloud(NULL),
194 clObjectKey(NULL)210 clObjectKey(NULL)
195{211{
196 enter_();212 enter_();
197213
214 cl_db = db_backref;
198 new_(clObjectKey, CSStringBuffer());215 new_(clObjectKey, CSStringBuffer());
199 clObjectKey->setLength(base_key_size);216 clObjectKey->setLength(CloudObjectKey::base_key_size);
217
218 new_(clCloudInfo, CSSyncSparseArray(5));
200 219
201 exit_();220 exit_();
202}221}
@@ -205,234 +224,120 @@
205CloudDB::~CloudDB()224CloudDB::~CloudDB()
206{225{
207 226
208 if (backupInfo)
209 backupInfo->release();
210
211 if (backupCloud)
212 backupCloud->release();
213
214 if (clObjectKey)227 if (clObjectKey)
215 clObjectKey->release();228 clObjectKey->release();
216 229
217}230 if (clCloudInfo) {
218//-------------------------------231 clCloudInfo->clear();
219MSBackupInfo *CloudDB::cl_getBackupInfo()232 clCloudInfo->release();
233 }
234
235}
236
237//-------------------------------
238MSCloudInfo *CloudDB::getCloudInfo(uint32_t in_cloudRefId)
239{
240 MSCloudInfo *info;
241 enter_();
242
243 lock_(this);
244
245 info = (MSCloudInfo *) clCloudInfo->get(in_cloudRefId);
246 if (!info) {
247 char msg[80];
248 snprintf(msg, 80, "Cloud info with reference ID %"PRIu32" not found", in_cloudRefId);
249 CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, msg);
250 }
251 info->retain();
252 unlock_(this);
253 return_(info);
254}
255
256//-------------------------------
257void CloudDB::setDefaultCloudRef(uint32_t dflt)
220{ 258{
221 if (backupInfo)259 dfltCloudRefId = dflt;
222 backupInfo->retain();260 cl_db->setBlobType((dflt)?MS_CLOUD_STORAGE:MS_STANDARD_STORAGE);
223 261}
224 return backupInfo;262
225}263//-------------------------------
226264uint32_t CloudDB::newCloudInfo(uint32_t id, const char *server, const char *bucket, const char *publicKey, const char *privateKey, bool isDefault )
227//-------------------------------265{
228void CloudDB::cl_clearBackupInfo(){ backupInfo->release(); backupInfo = NULL;}266 MSCloudInfo *info;
229267 enter_();
230//-------------------------------268
231void CloudDB::cl_createDB()269 if (id == 0)
270 id = clCloudInfo->maxIndex() + 1;
271
272 new_(info, MSCloudInfo(id, server, bucket, publicKey, privateKey));
273
274 push_(info);
275 lock_(this);
276 clCloudInfo->set(id, info);
277 unlock_(this);
278 pop_(info);
279
280 if (isDefault)
281 setDefaultCloudRef(id);
282
283 return_(id);
284}
285
286//-------------------------------
287void CloudDB::deleteCloudInfo(uint32_t id)
288{
289 enter_();
290 lock_(this);
291 if (id == dfltCloudRefId)
292 setDefaultCloudRef(0);
293
294 MSCloudInfo *info = (MSCloudInfo*) clCloudInfo->take(id);
295 if (info) info->release();
296 unlock_(this);
297 exit_();
298}
299
300//-------------------------------
301void CloudDB::getNewKey(CloudKeyPtr key)
302{
303 enter_();
304 lock_(this);
305
306 key->creation_time = time(NULL);
307 if (keyIndex == 0) keyIndex++;
308 key->ref_index = keyIndex++;
309 key->cloud_ref = dfltCloudRefId;
310
311 unlock_(this);
312 exit_();
313}
314
315//-------------------------------
316CSString *CloudDB::getObjectKey(CloudKeyPtr key)
317{
318 CloudObjectKey *objectKey;
319 enter_();
320
321 new_(objectKey, CloudObjectKey(cl_db->getDatabaseID()));
322 push_(objectKey);
323
324 objectKey->setObjectKey(key);
325
326 CSString *str = CSString::newString(objectKey->getCString());
327 release_(objectKey);
328
329 return_(str);
330}
331
332//-------------------------------
333void CloudDB::createDB()
232{334{
233// This is a no-op. 335// This is a no-op.
234}336}
235337
236//-------------------------------338//-------------------------------
237// Restore all the
238void CloudDB::cl_restoreDB()
239{
240 CSVector *list = NULL;
241 CSString *key = NULL;
242 CloudObjectKey *src_objectKey = NULL, *dst_objectKey = NULL;
243 CloudKeyRec cloudKey;
244 uint32_t src_cloudRef, dst_cloudRef = 0;
245 MSBackupInfo *backup_info = NULL;
246 MSCloudInfo *src_cloud = NULL, *dst_cloud = NULL;
247 enter_();
248
249 if (!blob_recovery_no)
250 exit_(); // nothing to do.
251
252 backup_info = MSBackupInfo::getBackupInfo(blob_recovery_no);
253 push_(backup_info);
254
255 src_cloudRef = backup_info->getcloudRef();
256 src_cloud = MSCloudInfo::getCloudInfo(src_cloudRef);
257 push_(src_cloud);
258
259 new_(dst_objectKey, CloudObjectKey(blob_db_id));
260 push_(dst_objectKey);
261
262 // Get the key for the backup BLOB
263 new_(src_objectKey, CloudObjectKey(blob_db_id));
264 push_(src_objectKey);
265 src_objectKey->setObjectKey(NULL, backup_info->getcloudBackupNo(), backup_info->getDatabaseId());
266
267 // Get a list of all the BLOBs that were backed up.
268 list = src_cloud->list(src_objectKey->getCString());
269 release_(src_objectKey);
270 push_(list);
271
272
273 // Go through the list copying the keys.
274 dst_cloudRef = src_cloudRef;
275 dst_cloud = src_cloud;
276 dst_cloud->retain();
277
278 push_ref_(dst_cloud); // Push a reference to dst_cloud so that what ever it references will be released.
279
280 while ((key = (CSString*)(list->take(0))) ) {
281 push_(key);
282
283 // The source key name must be parsed to get its
284 // destination cloud reference. The destination for
285 // the BLOBs may not all be in the same cloud.
286 CloudObjectKey::parseObjectKey(key->getCString(), &cloudKey);
287
288 // Reset the destination cloud if required.
289 if (cloudKey.cloud_ref != dst_cloudRef) {
290 if (dst_cloud) {
291 dst_cloud->release();
292 dst_cloud = NULL;
293 }
294 dst_cloudRef = cloudKey.cloud_ref;
295 dst_cloud = MSCloudInfo::getCloudInfo(dst_cloudRef);
296 }
297
298 // Copy the BLOB to the recovered database.
299 dst_objectKey->setObjectKey(&cloudKey);
300 src_cloud->copy(RETAIN(dst_cloud), dst_objectKey->getCString(), key->getCString());
301 release_(key);
302
303 }
304
305 release_(dst_cloud);
306
307 blob_recovery_no = 0;
308 release_(list);
309 release_(dst_objectKey);
310 release_(src_cloud);
311 release_(backup_info);
312 exit_();
313}
314
315//-------------------------------
316uint32_t CloudDB::cl_getNextBackupNumber(uint32_t cloud_ref)
317{
318 CloudObjectKey *objectKey;
319 CSVector *list;
320 uint32_t backup_no = 0, size = 1;
321 MSCloudInfo *s3Cloud;
322 enter_();
323
324 s3Cloud = MSCloudInfo::getCloudInfo((cloud_ref)?cloud_ref:dfltCloudRefId);
325 push_(s3Cloud);
326
327 new_(objectKey, CloudObjectKey(blob_db_id));
328 push_(objectKey);
329
330 // Find the next available backup number
331 while (size) {
332 backup_no++;
333 objectKey->setObjectKey(NULL, backup_no); // use the key prefix with the backup number for listing.
334 list = s3Cloud->list(objectKey->getCString(), 1);
335 size = list->size();
336 list->release();
337 }
338
339 release_(objectKey);
340 release_(s3Cloud);
341
342 return_(backup_no);
343}
344
345//-------------------------------
346void CloudDB::cl_backupBLOB(CloudKeyPtr key)
347{
348 CloudObjectKey *src_objectKey, *dst_objectKey;
349 uint32_t cloudRef, backupNo;
350 MSCloudInfo *src_cloud = NULL, *dst_cloud = NULL;
351 enter_();
352
353 ASSERT(backupInfo);
354
355 if ((cloudRef = backupInfo->getcloudRef()) == 0) {
356 backupInfo->setcloudRef(dfltCloudRefId);
357 cloudRef = dfltCloudRefId;
358 }
359
360 if ((backupNo = backupInfo->getcloudBackupNo()) == 0) {
361 backupNo = cl_getNextBackupNumber(cloudRef);
362 backupInfo->setcloudBackupNo(backupNo);
363 }
364
365 // Set the source object's key
366 new_(src_objectKey, CloudObjectKey(blob_db_id));
367 push_(src_objectKey);
368 src_objectKey->setObjectKey(key);
369
370 // Set the destination object's key
371 new_(dst_objectKey, CloudObjectKey(blob_db_id));
372 push_(dst_objectKey);
373 dst_objectKey->setObjectKey(key, backupNo);
374
375 // Get the source cloud
376 src_cloud = MSCloudInfo::getCloudInfo((key->cloud_ref)?key->cloud_ref:dfltCloudRefId);
377 push_(src_cloud);
378
379 // Copy the object to the destination cloud
380 dst_cloud = MSCloudInfo::getCloudInfo(cloudRef);
381 src_cloud->copy(dst_cloud, dst_objectKey->getCString(), src_objectKey->getCString());
382
383 release_(src_cloud);
384 release_(dst_objectKey);
385 release_(src_objectKey);
386 exit_();
387}
388
389//-------------------------------
390void CloudDB::cl_restoreBLOB(CloudKeyPtr key, uint32_t backup_db_id)
391{
392 CloudObjectKey *src_objectKey, *dst_objectKey;
393 uint32_t cloudRef, backupNo;
394 MSCloudInfo *src_cloud = NULL, *dst_cloud = NULL;
395 enter_();
396
397 ASSERT(backupInfo);
398
399 if ((cloudRef = backupInfo->getcloudRef()) == 0) {
400 backupInfo->setcloudRef(dfltCloudRefId);
401 cloudRef = dfltCloudRefId;
402 }
403
404 if ((backupNo = backupInfo->getcloudBackupNo()) == 0) {
405 backupNo = cl_getNextBackupNumber(cloudRef);
406 backupInfo->setcloudBackupNo(backupNo);
407 }
408
409 // Set the source object's key
410 new_(src_objectKey, CloudObjectKey(backup_db_id));
411 push_(src_objectKey);
412 src_objectKey->setObjectKey(key, backupNo);
413
414 // Set the destination object's key
415 new_(dst_objectKey, CloudObjectKey(blob_db_id));
416 push_(dst_objectKey);
417 dst_objectKey->setObjectKey(key);
418
419 // Get the source cloud
420 src_cloud = MSCloudInfo::getCloudInfo(cloudRef);
421 push_(src_cloud);
422
423 // Copy the object to the destination cloud
424 dst_cloud = MSCloudInfo::getCloudInfo((key->cloud_ref)?key->cloud_ref:dfltCloudRefId);
425 src_cloud->copy(dst_cloud, dst_objectKey->getCString(), src_objectKey->getCString());
426
427 release_(src_cloud);
428 release_(dst_objectKey);
429 release_(src_objectKey);
430 exit_();
431}
432
433//-------------------------------
434// Drop database deletes all objects with the database key prefix339// Drop database deletes all objects with the database key prefix
435void CloudDB::cl_dropDB()340void CloudDB::dropDB()
436{341{
437 CSVector *list;342 CSVector *list;
438 CSString *key;343 CSString *key;
@@ -442,30 +347,20 @@
442 const char *key_str;347 const char *key_str;
443 348
444 enter_();349 enter_();
445 new_(objectKey, CloudObjectKey(blob_db_id));350 new_(objectKey, CloudObjectKey(cl_db->getDatabaseID()));
446 push_(objectKey);351 push_(objectKey);
447 352
448 lock_(MSCloudInfo::gCloudInfo);353 lock_(this);
449354
450 if (isBackup) {355 objectKey->setObjectKey(); // use the key prefix for listing.
451 uint32_t backup_no;
452 if (backupInfo && (backup_no = backupInfo->getcloudBackupNo())) {
453 objectKey->setObjectKey(NULL, backup_no); // use the key prefix for the backup for listing.
454 if ((s3Cloud = MSCloudInfo::getCloudInfo(backupInfo->getcloudRef())))
455 push_(s3Cloud);
456 }
457 } else {
458 objectKey->setObjectKey(); // use the key prefix for listing.
459 i = 0;
460 s3Cloud = (MSCloudInfo*)MSCloudInfo::gCloudInfo->itemAt(i++); // <-- unreferenced object
461 }
462
463 key_str = objectKey->getCString();356 key_str = objectKey->getCString();
464357
465 // For non backup BLOBs all known clouds must be searched 358 // For non backup BLOBs all known clouds must be searched
466 // for possible BLOBs and deleted. The BLOBs belonging to a backup359 // for possible BLOBs and deleted. The BLOBs belonging to a backup
467 // will ever only be in one cloud storage location.360 // will ever only be in one cloud storage location.
468 while (s3Cloud) {361 i = 0;
362 while ((s3Cloud = getCloudInfoAt(i++))) {
363 push_(s3Cloud);
469 list = s3Cloud->list(key_str);364 list = s3Cloud->list(key_str);
470 push_(list);365 push_(list);
471 366
@@ -477,20 +372,16 @@
477 }372 }
478 373
479 release_(list);374 release_(list);
480 if (isBackup) {375 release_(s3Cloud);
481 release_(s3Cloud); // Only the backup s3Cloud needs to be released.
482 s3Cloud = NULL;
483 } else
484 s3Cloud = (MSCloudInfo*)MSCloudInfo::gCloudInfo->itemAt(i++);// <-- unreferenced object
485 }376 }
486 377
487 unlock_(MSCloudInfo::gCloudInfo);378 unlock_(this);
488 release_(objectKey);379 release_(objectKey);
489 exit_();380 exit_();
490}381}
491382
492//-------------------------------383//-------------------------------
493void CloudDB::cl_putData(CloudKeyPtr key, CSInputStream *stream, off64_t size)384void CloudDB::putData(CloudKeyPtr key, CSInputStream *stream, off64_t size)
494{385{
495 CloudObjectKey *objectKey;386 CloudObjectKey *objectKey;
496 MSCloudInfo *s3Cloud;387 MSCloudInfo *s3Cloud;
@@ -499,12 +390,12 @@
499 390
500 push_(stream);391 push_(stream);
501 392
502 new_(objectKey, CloudObjectKey(blob_db_id));393 new_(objectKey, CloudObjectKey(cl_db->getDatabaseID()));
503 push_(objectKey);394 push_(objectKey);
504 395
505 objectKey->setObjectKey(key);396 objectKey->setObjectKey(key);
506 397
507 s3Cloud = MSCloudInfo::getCloudInfo((key->cloud_ref)?key->cloud_ref:dfltCloudRefId);398 s3Cloud = getCloudInfo((key->cloud_ref)?key->cloud_ref:dfltCloudRefId);
508 push_(s3Cloud);399 push_(s3Cloud);
509 s3Cloud->send(RETAIN(stream), objectKey->getCString(), size);400 s3Cloud->send(RETAIN(stream), objectKey->getCString(), size);
510 release_(s3Cloud);401 release_(s3Cloud);
@@ -516,17 +407,17 @@
516}407}
517408
518//-------------------------------409//-------------------------------
519off64_t CloudDB::cl_getData(CloudKeyPtr key, char *buffer, off64_t size)410off64_t CloudDB::getData(CloudKeyPtr key, char *buffer, off64_t size)
520{ 411{
521 CloudObjectKey *objectKey;412 CloudObjectKey *objectKey;
522 CSStaticMemoryOutputStream *output;413 CSStaticMemoryOutputStream *output;
523 MSCloudInfo *s3Cloud;414 MSCloudInfo *s3Cloud;
524 enter_();415 enter_();
525 416
526 new_(objectKey, CloudObjectKey(blob_db_id));417 new_(objectKey, CloudObjectKey(cl_db->getDatabaseID()));
527 push_(objectKey);418 push_(objectKey);
528 419
529 s3Cloud = MSCloudInfo::getCloudInfo(key->cloud_ref);420 s3Cloud = getCloudInfo(key->cloud_ref);
530 push_(s3Cloud);421 push_(s3Cloud);
531422
532 new_(output, CSStaticMemoryOutputStream((u_char *)buffer, size));423 new_(output, CSStaticMemoryOutputStream((u_char *)buffer, size));
@@ -544,16 +435,16 @@
544}435}
545436
546//-------------------------------437//-------------------------------
547void CloudDB::cl_deleteData(CloudKeyPtr key)438void CloudDB::deleteData(CloudKeyPtr key)
548{439{
549 MSCloudInfo *s3Cloud;440 MSCloudInfo *s3Cloud;
550 CloudObjectKey *objectKey;441 CloudObjectKey *objectKey;
551 enter_();442 enter_();
552 443
553 new_(objectKey, CloudObjectKey(blob_db_id));444 new_(objectKey, CloudObjectKey(cl_db->getDatabaseID()));
554 push_(objectKey);445 push_(objectKey);
555 446
556 s3Cloud = MSCloudInfo::getCloudInfo(key->cloud_ref);447 s3Cloud = getCloudInfo(key->cloud_ref);
557 push_(s3Cloud);448 push_(s3Cloud);
558449
559 objectKey->setObjectKey(key);450 objectKey->setObjectKey(key);
@@ -567,19 +458,19 @@
567}458}
568459
569//-------------------------------460//-------------------------------
570CSString *CloudDB::cl_getDataURL(CloudKeyPtr key)461CSString *CloudDB::getDataURL(CloudKeyPtr key)
571{462{
572 CloudObjectKey *objectKey;463 CloudObjectKey *objectKey;
573 CSString *url;464 CSString *url;
574 MSCloudInfo *s3Cloud;465 MSCloudInfo *s3Cloud;
575 enter_();466 enter_();
576 467
577 new_(objectKey, CloudObjectKey(blob_db_id));468 new_(objectKey, CloudObjectKey(cl_db->getDatabaseID()));
578 push_(objectKey);469 push_(objectKey);
579 470
580 objectKey->setObjectKey(key);471 objectKey->setObjectKey(key);
581 472
582 s3Cloud = MSCloudInfo::getCloudInfo(key->cloud_ref); 473 s3Cloud = getCloudInfo(key->cloud_ref);
583 push_(s3Cloud);474 push_(s3Cloud);
584 475
585 url = s3Cloud->getDataURL(objectKey->getCString(), keep_alive);476 url = s3Cloud->getDataURL(objectKey->getCString(), keep_alive);
@@ -591,7 +482,7 @@
591}482}
592483
593//-------------------------------484//-------------------------------
594CSString *CloudDB::cl_getSignature(CloudKeyPtr key, CSString *content_type_arg, uint32_t *s3AuthorizationTime)485CSString *CloudDB::getSignature(CloudKeyPtr key, CSString *content_type_arg, uint32_t *s3AuthorizationTime)
595{486{
596 CSString *signature;487 CSString *signature;
597 CloudObjectKey *objectKey;488 CloudObjectKey *objectKey;
@@ -599,7 +490,7 @@
599 MSCloudInfo *s3Cloud;490 MSCloudInfo *s3Cloud;
600 enter_();491 enter_();
601 492
602 new_(objectKey, CloudObjectKey(blob_db_id));493 new_(objectKey, CloudObjectKey(cl_db->getDatabaseID()));
603 push_(objectKey);494 push_(objectKey);
604 495
605 if (content_type_arg) {496 if (content_type_arg) {
@@ -608,7 +499,7 @@
608 }499 }
609 500
610 objectKey->setObjectKey(key);501 objectKey->setObjectKey(key);
611 s3Cloud = MSCloudInfo::getCloudInfo(key->cloud_ref); 502 s3Cloud = getCloudInfo(key->cloud_ref);
612 push_(s3Cloud);503 push_(s3Cloud);
613 504
614 signature = s3Cloud->getSignature(objectKey->getCString(), content_type, s3AuthorizationTime);505 signature = s3Cloud->getSignature(objectKey->getCString(), content_type, s3AuthorizationTime);
615506
=== renamed file 'plugin/pbms/src/cloud_ms.h' => 'plugin/pbms/src/cloud/cloud_ms.h'
--- plugin/pbms/src/cloud_ms.h 2011-03-14 05:40:28 +0000
+++ plugin/pbms/src/cloud/cloud_ms.h 2011-06-01 23:57:43 +0000
@@ -25,79 +25,47 @@
25#include "cslib/CSMd5.h"25#include "cslib/CSMd5.h"
26#include <inttypes.h>26#include <inttypes.h>
2727
28
29/* NOTES:28/* NOTES:
30 *29 *
31 * - TODO: If cl_deleteData() fails then the BLOB deletion must fail and be rescheduled to try again30 * - TODO: If deleteData() fails then the BLOB deletion must fail and be rescheduled to try again
32 * later.31 * later.
33 * - TODO: Copying of BLOBs from one database to another needs to be handled. Look for copyBlob() and 32 * - TODO: Copying of BLOBs from one database to another needs to be handled. Look for copyBlob() and
34 * resetBlobHead(). There are 3 cases to handle depending on if the databases involved use33 * resetBlobHead(). There are 3 cases to handle depending on if the databases involved use
35 * cload storage.34 * cloud storage.
36 */35 */
37 36
38//===============================37//===============================
39class CSS3Protocol; 38class CSS3Protocol;
40class MSCloudInfo : public CSRefObject {39class MSCloudInfo : public CSRefObject {
41 private:40 private:
42 static uint32_t gMaxInfoRef;
43 static CSSyncSparseArray *gCloudInfo;
4441
45 friend class MSCloudTable;42 friend class MSCloudTable;
46 friend class CloudDB;43 friend class CloudDB;
47 44
48 private: 45 private:
49 uint32_t cloudRefId;46 uint32_t cloudRefId;
50 CSString *bucket;47 CSString *bucket;
51 CSS3Protocol *s3Prot;48 CSS3Protocol *s3Prot;
49 bool defaultCloudRef;
52 50
53public:51public:
5452
55 static void startUp()53 MSCloudInfo(uint32_t id, const char *server, const char *bucket, const char *publicKey, const char *privateKey);
56 {
57 new_(gCloudInfo, CSSyncSparseArray(5));
58 gMaxInfoRef = 0;
59 }
60
61 static void shutDown()
62 {
63 if (gCloudInfo) {
64 gCloudInfo->clear();
65 gCloudInfo->release();
66 gCloudInfo = NULL;
67 }
68 }
69
70
71 static MSCloudInfo *getCloudInfo(uint32_t in_cloudRefId)
72 {
73 MSCloudInfo *info;
74 enter_();
75
76 lock_(gCloudInfo);
77
78 info = (MSCloudInfo *) gCloudInfo->get(in_cloudRefId);
79 if (!info) {
80 char msg[80];
81 snprintf(msg, 80, "Cloud info with reference ID %"PRIu32" not found", in_cloudRefId);
82 CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, msg);
83 }
84 info->retain();
85 unlock_(gCloudInfo);
86 return_(info);
87 }
88
89 MSCloudInfo(uint32_t id, const char *server, const char *bucket, const char *publicKey, const char *privateKey );
90 ~MSCloudInfo();54 ~MSCloudInfo();
91 55
92 uint32_t getCloudRefId() { return cloudRefId;}56 uint32_t getCloudRefId() { return cloudRefId;}
93 57
94 const char *getServer();58 const char *getServer();
59 void setServer(const char *server);
95 60
96 const char *getBucket();61 const char *getBucket();
62 void setBucket(const char *bucket);
97 63
98 const char *getPublicKey();64 const char *getPublicKey();
65 void setPublicKey(const char *key);
99 66
100 const char *getPrivateKey();67 const char *getPrivateKey();
68 void setPrivateKey(const char *key);
10169
102 CSString *getSignature(const char *key, const char *content_type, uint32_t *s3AuthorizationTime);70 CSString *getSignature(const char *key, const char *content_type, uint32_t *s3AuthorizationTime);
103 71
@@ -124,13 +92,13 @@
124//===============================92//===============================
125class CloudObjectKey : public CSStringBuffer93class CloudObjectKey : public CSStringBuffer
126{94{
127 uint32_t default_db_id;95 uint16_t default_db_id;
128 96
129 public:97 public:
130 CloudObjectKey(uint32_t id): CSStringBuffer(), default_db_id(id){ }98 CloudObjectKey(uint16_t id): CSStringBuffer(), default_db_id(id){ }
131 ~CloudObjectKey(){}99 ~CloudObjectKey(){}
132 100
133 static const uint32_t base_key_size = 64; // enough space for <db_id>/<backup_id>/<creation_time>/<ref_index>101 static const uint32_t base_key_size = 64; // enough space for <db_id>/<creation_time>/<ref_index>
134102
135 void setObjectKey(const char *object_key)103 void setObjectKey(const char *object_key)
136 {104 {
@@ -139,129 +107,83 @@
139 snprintf(getBuffer(0), length(), "%"PRIu32"/0/%s",default_db_id, object_key);107 snprintf(getBuffer(0), length(), "%"PRIu32"/0/%s",default_db_id, object_key);
140 }108 }
141 109
142 void setObjectKey(CloudKeyPtr key = NULL, uint32_t backup_id = 0, uint32_t db_id = 0)110 void setObjectKey(CloudKeyPtr key = NULL, uint32_t db_id = 0)
143 {111 {
144 if (!db_id) db_id = default_db_id;112 if (!db_id) db_id = default_db_id;
145 setLength(base_key_size);113 setLength(base_key_size);
146 114
147 if (key)115 if (key)
148 snprintf(getBuffer(0), length(), "%"PRIu32"/%"PRIu32"/%"PRIu32".%"PRIu32".%"PRIu32"", db_id, backup_id, key->cloud_ref, key->creation_time, key->ref_index);116 snprintf(getBuffer(0), length(), "%"PRIu32"/%"PRIu32".%"PRIu32".%"PRIu32"", db_id, key->cloud_ref, key->creation_time, key->ref_index);
149 else 117 else
150 snprintf(getBuffer(0), length(), "%"PRIu32"/%"PRIu32"s/", db_id, backup_id);118 snprintf(getBuffer(0), length(), "%"PRIu32"/", db_id);
151 119
152 }120 }
153 121
154 static void parseObjectKey(const char *object_key, CloudKeyPtr key, uint32_t *backup_id = NULL, uint32_t *db_id = NULL)122 static void parseObjectKey(const char *object_key, CloudKeyPtr key, uint32_t *db_id = NULL)
155 {123 {
156 uint32_t v1;124 uint32_t v1;
157 125
158 if (!backup_id) backup_id = &v1;
159 if (!db_id) db_id = &v1;126 if (!db_id) db_id = &v1;
160 127
161 sscanf(object_key, "%"PRIu32"/%"PRIu32"/%"PRIu32".%"PRIu32".%"PRIu32"", db_id, backup_id, &(key->cloud_ref), &(key->creation_time), &(key->ref_index));128 sscanf(object_key, "%"PRIu32"/%"PRIu32".%"PRIu32".%"PRIu32"", db_id, &(key->cloud_ref), &(key->creation_time), &(key->ref_index));
162 }129 }
163};130};
164131
165//===============================132//===============================
166class MSBackupInfo;133class MSDatabase;
167class CloudDB: public CSRefObject {134class CloudDB: public CSSharedRefObject {
168 135
169private:136private:
170 static uint32_t gKeyIndex;137 CSSyncSparseArray *clCloudInfo;
171 static CSMutex gCloudKeyLock;
172 138
139 uint32_t keyIndex;
173 uint32_t dfltCloudRefId;140 uint32_t dfltCloudRefId;
174 141
175 uint32_t keep_alive; // The length of time a redirect URL will remain valid. In seconds.142 uint32_t keep_alive; // The length of time a redirect URL will remain valid. In seconds.
176 uint32_t blob_recovery_no; // This is the backup number from which the recovery should be done.143 MSDatabase *cl_db; // A back reference to the database that this object belongs to.
177 uint32_t blob_db_id;144
178
179 bool isBackup;
180 MSBackupInfo *backupInfo;
181 MSCloudInfo *backupCloud;
182
183 static const uint32_t base_key_size = 64; // enough space for <db_id>/<backup_id>/<creation_time>/<ref_index>
184
185public:145public:
186 CSStringBuffer *clObjectKey;146 CSStringBuffer *clObjectKey;
187 147
188 CloudDB(uint32_t db_id);148 CloudDB(MSDatabase *db_backref);
189 ~CloudDB();149 ~CloudDB();
190 150
191 void cl_setDefaultCloudRef(uint32_t dflt) { dfltCloudRefId = dflt;}151 bool isLoaded() { return (clCloudInfo->size() > 0); }
192 uint32_t cl_getDefaultCloudRef() { return dfltCloudRefId;}152 bool isValidID(uint32_t info_id) { return (clCloudInfo->get(info_id) != NULL); }
193153
194 MSCloudInfo *cl_getCloudInfo(uint32_t cloudRefId = 0)154 uint32_t getCloudInfoIndexForID(uint32_t id) { return clCloudInfo->getIndex(id);}
195 {155
196 return MSCloudInfo::getCloudInfo((cloudRefId)?cloudRefId:dfltCloudRefId);156 MSCloudInfo *getCloudInfoAt(uint32_t index)
197 }157 {
198 158 MSCloudInfo *info = (MSCloudInfo *)clCloudInfo->itemAt(index);
199 void cl_getNewKey(CloudKeyPtr key)159 if (!info)
200 {160 return NULL;
201 enter_();161 return RETAIN(info);
202 lock_(&gCloudKeyLock); 162 }
203 163
204 key->creation_time = time(NULL);164 MSCloudInfo *getCloudInfo(uint32_t in_cloudRefId);
205 key->ref_index = gKeyIndex++;165 MSCloudInfo *getDefaultCloudInfo() { return getCloudInfo(dfltCloudRefId); }
206 key->cloud_ref = dfltCloudRefId;166
207 167 uint32_t newCloudInfo(uint32_t id, const char *server, const char *bucket, const char *publicKey, const char *privateKey, bool isDefault );
208 unlock_(&gCloudKeyLock); 168
209 exit_();169
210 }170 void deleteCloudInfo(uint32_t id);
211171 void setDefaultCloudRef(uint32_t dflt);
212 bool cl_mustRecoverBlobs() { return (blob_recovery_no != 0);}172 uint32_t getDefaultCloudRef() { return dfltCloudRefId;}
213 173
214 void cl_setRecoveryNumber(const char *number)174 void getNewKey(CloudKeyPtr key);
215 {175 CSString *getObjectKey(CloudKeyPtr key);
216 blob_recovery_no = atol(number);176 void setKeepAlive(uint32_t keep_alive_arg) {keep_alive = keep_alive_arg;}
217 }177
218178 void createDB();
219 const char *cl_getRecoveryNumber()179 void dropDB();
220 {180 bool dbExists();
221 static char number[20];181
222 182 void putData( CloudKeyPtr key, CSInputStream *stream, off64_t size);
223 snprintf(number, 20, "%"PRIu32"", blob_recovery_no);183 off64_t getData(CloudKeyPtr key, char *data, off64_t size);
224 return number;184 CSString *getDataURL(CloudKeyPtr key);
225 }185 void deleteData(CloudKeyPtr key);
226186 CSString *getSignature(CloudKeyPtr key, CSString *content_type, uint32_t *s3AuthorizationTime);
227 CSString *cl_getObjectKey(CloudKeyPtr key)
228 {
229 CloudObjectKey *objectKey;
230 enter_();
231
232 new_(objectKey, CloudObjectKey(blob_db_id));
233 push_(objectKey);
234
235 objectKey->setObjectKey(key);
236
237 CSString *str = CSString::newString(objectKey->getCString());
238 release_(objectKey);
239
240 return_(str);
241 }
242
243 void cl_setKeepAlive(uint32_t keep_alive_arg) {keep_alive = keep_alive_arg;}
244
245 void cl_createDB();
246 void cl_dropDB();
247 void cl_restoreDB();
248 uint32_t cl_getNextBackupNumber(uint32_t cloud_ref = 0);
249 bool cl_dbExists();
250
251 // setting backup_blob_no to -1 ensures that if the database is dropped no BLOBs will be deleted.
252 void cl_setCloudIsBackup(){ isBackup = true;}
253 void cl_setBackupInfo(MSBackupInfo *info){ backupInfo = info;}
254 MSBackupInfo *cl_getBackupInfo();
255 void cl_clearBackupInfo();
256
257 void cl_backupBLOB(CloudKeyPtr key);
258 void cl_restoreBLOB(CloudKeyPtr key, uint32_t backup_db_id);
259
260 void cl_putData( CloudKeyPtr key, CSInputStream *stream, off64_t size);
261 off64_t cl_getData(CloudKeyPtr key, char *data, off64_t size);
262 CSString *cl_getDataURL(CloudKeyPtr key);
263 void cl_deleteData(CloudKeyPtr key);
264 CSString *cl_getSignature(CloudKeyPtr key, CSString *content_type, uint32_t *s3AuthorizationTime);
265 187
266};188};
267189
268190
=== removed file 'plugin/pbms/src/compactor_ms.cc'
--- plugin/pbms/src/compactor_ms.cc 2010-12-18 04:43:40 +0000
+++ plugin/pbms/src/compactor_ms.cc 1970-01-01 00:00:00 +0000
@@ -1,343 +0,0 @@
1/* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
2 *
3 * PrimeBase Media Stream for MySQL
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Original author: Paul McCullagh
20 * Continued development: Barry Leslie
21 *
22 * 2007-07-10
23 *
24 * H&G2JCtL
25 *
26 * Network interface.
27 *
28 */
29
30#include "cslib/CSConfig.h"
31
32#include "defs_ms.h"
33
34#include "cslib/CSGlobal.h"
35#include "cslib/CSStrUtil.h"
36#include "cslib/CSStorage.h"
37
38#include "compactor_ms.h"
39#include "open_table_ms.h"
40#include "repository_ms.h"
41#include "parameters_ms.h"
42
43/*
44 * ---------------------------------------------------------------
45 * COMPACTOR THREAD
46 */
47
48MSCompactorThread::MSCompactorThread(time_t wait_time, MSDatabase *db):
49CSDaemon(wait_time, NULL),
50iCompactorDatabase(db)
51{
52}
53
54void MSCompactorThread::close()
55{
56}
57
58bool MSCompactorThread::doWork()
59{
60 bool complete;
61 MSRepository *src_repo, *dst_repo;
62 MSRepoFile *src_file, *dst_file;
63 uint32_t src_repo_id;
64 MSBlobHeadRec blob;
65 off64_t src_offset;
66 uint16_t head_size;
67 uint64_t blob_size, blob_data_size;
68 CSStringBuffer *head;
69 MSRepoPointersRec ptr;
70 uint32_t table_ref_count;
71 uint32_t blob_ref_count;
72 int ref_count;
73 size_t ref_size;
74 CSMutex *mylock;
75 uint32_t tab_id;
76 uint64_t blob_id;
77 MSOpenTable *otab;
78 uint32_t repo_id;
79 uint64_t repo_offset;
80 uint64_t repo_blob_size;
81 uint16_t repo_head_size;
82 uint16_t tab_index;
83 uint8_t status;
84
85 enter_();
86 retry:
87
88#ifdef MS_COMPACTOR_POLLS
89 if (!(src_repo = iCompactorDatabase->getRepoFullOfTrash(NULL)))
90 return_(true);
91#else
92 myWaitTime = MS_DEFAULT_COMPACTOR_WAIT * 1000; // Time in milli-seconds
93 if (!(src_repo = iCompactorDatabase->getRepoFullOfTrash(&myWaitTime)))
94 return_(true);
95#endif
96 frompool_(src_repo);
97 src_file = src_repo->openRepoFile();
98 push_(src_file);
99
100 dst_repo = iCompactorDatabase->lockRepo(src_repo->myRepoFileSize - src_repo->myGarbageCount);
101 frompool_(dst_repo);
102 dst_file = dst_repo->openRepoFile();
103 push_(dst_file);
104
105 new_(head, CSStringBuffer(100));
106 push_(head);
107
108 complete = false;
109 src_repo_id = src_repo->myRepoID;
110 src_offset = src_repo->myRepoHeadSize;
111 //printf("\nCompacting repo %"PRId32"\n\n", src_repo_id);
112 // For testing:
113 {
114 int blockit = 0;
115 if (blockit) {
116 release_(head);
117 release_(dst_file);
118 backtopool_(dst_repo);
119 release_(src_file);
120 backtopool_(src_repo);
121
122 myWaitTime = 5 * 1000; // Time in milli-seconds
123 return_(true);
124 }
125 }
126 while (src_offset < src_repo->myRepoFileSize) {
127 retry_loop:
128 suspended();
129
130 if (myMustQuit)
131 goto quit;
132 retry_read:
133
134 // A lock is required here because references and dereferences to the
135 // BLOBs can result in the repository record being updated while
136 // it is being copied.
137 mylock = &src_repo->myRepoLock[src_offset % CS_REPO_REC_LOCK_COUNT];
138 lock_(mylock);
139 if (src_file->read(&blob, src_offset, src_repo->myRepoBlobHeadSize, 0) < src_repo->myRepoBlobHeadSize) {
140 unlock_(mylock);
141 break;
142 }
143 ref_size = CS_GET_DISK_1(blob.rb_ref_size_1);
144 ref_count = CS_GET_DISK_2(blob.rb_ref_count_2);
145 head_size = CS_GET_DISK_2(blob.rb_head_size_2);
146 blob_size = CS_GET_DISK_6(blob.rb_blob_repo_size_6);
147 blob_data_size = CS_GET_DISK_6(blob.rb_blob_data_size_6);
148 status = CS_GET_DISK_1(blob.rb_status_1);
149 if ((blob_data_size == 0) || ref_count <= 0 || ref_size == 0 ||
150 head_size < src_repo->myRepoBlobHeadSize + ref_count * ref_size ||
151 !VALID_BLOB_STATUS(status)) {
152 /* Can't be true. Assume this is garbage! */
153 unlock_(mylock);
154 src_offset++;
155 goto retry_read;
156 }
157 if (IN_USE_BLOB_STATUS(status)) {
158 head->setLength(head_size);
159 if (src_file->read(head->getBuffer(0), src_offset, head_size, 0) != head_size) {
160 unlock_(mylock);
161 break;
162 }
163
164 table_ref_count = 0;
165 blob_ref_count = 0;
166
167 ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
168 for (int count = 0; count < ref_count; count++) {
169 switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
170 case MS_BLOB_FREE_REF:
171 break;
172 case MS_BLOB_TABLE_REF:
173 /* Check the reference: */
174 tab_id = CS_GET_DISK_4(ptr.rp_tab_ref->tr_table_id_4);
175 blob_id = CS_GET_DISK_6(ptr.rp_tab_ref->tr_blob_id_6);
176
177 otab = MSTableList::getOpenTableByID(iCompactorDatabase->myDatabaseID, tab_id);
178 if (otab) {
179 frompool_(otab);
180 /* Ignore the return value (it will fail because auth_code is wrong!)!! */
181 uint32_t auth_code = 0;
182 otab->getDBTable()->readBlobHandle(otab, blob_id, &auth_code, &repo_id, &repo_offset, &repo_blob_size, &repo_head_size, false);
183 backtopool_(otab);
184 if (repo_id == src_repo_id &&
185 repo_offset == src_offset &&
186 repo_blob_size == blob_data_size &&
187 repo_head_size == head_size)
188 table_ref_count++;
189 else
190 /* Remove the reference: */
191 CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
192 }
193 else
194 CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
195 break;
196 case MS_BLOB_DELETE_REF:
197 /* These are temporary references from the TempLog file. */
198 /* We try to prevent this from happening, but it can! */
199 uint32_t temp_log_id;
The diff has been truncated for viewing.