Merge lp:~vkolesnikov/pbxt/pbxt-bug-379315 into lp:pbxt

Proposed by Vladimir Kolesnikov
Status: Merged
Merged at revision: not available
Proposed branch: lp:~vkolesnikov/pbxt/pbxt-bug-379315
Merge into: lp:pbxt
Diff against target: None lines
To merge this branch: bzr merge lp:~vkolesnikov/pbxt/pbxt-bug-379315
Reviewer Review Type Date Requested Status
PBXT Core Pending
Review via email: mp+7292@code.launchpad.net
To post a comment you must log in.
lp:~vkolesnikov/pbxt/pbxt-bug-379315 updated
657. By Paul McCullagh

Merged RN245

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'ChangeLog'
2--- ChangeLog 2009-06-03 14:53:00 +0000
3+++ ChangeLog 2009-06-10 16:26:33 +0000
4@@ -3,6 +3,10 @@
5
6 ------- 1.0.08 RC - Not yet released
7
8+RN245: Fixed bug #379315: Inconsistent behavior of DELETE IGNORE and FK constraint
9+
10+RN244: Fixed a recovery problem: during the recovery of "record modified" action the table was updated before the old index entries were removed; then the xres_remove_index_entries was supplied the new record which lead to incorrect index update
11+
12 RN243: Fixed a bug that caused a recovery failure if partitioned pbxt tables where present. This happended because the recovery used a MySQL function to open tables and the PBXT handler was not yet registered
13
14 RN242: Fixed a bug that caused a deadlock if pbxt initialization failed. This happened because pbxt ceanup was done from pbxt_init() with PLUGIN_lock being held by MySQL which lead to a deadlock in the freeer thread
15
16=== modified file 'src/ha_pbxt.cc'
17--- src/ha_pbxt.cc 2009-06-03 14:07:54 +0000
18+++ src/ha_pbxt.cc 2009-06-10 16:26:33 +0000
19@@ -1565,7 +1565,11 @@
20 freer_(); // xt_unlock_mutex(share->sh_ex_mutex)
21 }
22
23+#ifdef PBXT_ALLOW_PRINTING
24+static void ha_release_exclusive_use(XTThreadPtr self, XTSharePtr share)
25+#else
26 static void ha_release_exclusive_use(XTThreadPtr XT_UNUSED(self), XTSharePtr share)
27+#endif
28 {
29 XT_PRINT1(self, "ha_release_exclusive_use %s PBXT X UNLOCK\n", share->sh_table_path->ps_path);
30 xt_lock_mutex_ns((xt_mutex_type *) share->sh_ex_mutex);
31@@ -4164,10 +4168,15 @@
32 pb_open_tab->ot_is_modify = FALSE;
33 if ((pb_open_tab->ot_for_update = (lock_type == F_WRLCK))) {
34 switch ((int) thd_sql_command(thd)) {
35+ case SQLCOM_DELETE:
36+ case SQLCOM_DELETE_MULTI:
37+ /* turn DELETE IGNORE into normal DELETE. The IGNORE option causes problems because
38+ * when a record is deleted we add an xlog record which we cannot "rollback" later
39+ * when we find that an FK-constraint has failed.
40+ */
41+ thd->lex->ignore = false;
42 case SQLCOM_UPDATE:
43 case SQLCOM_UPDATE_MULTI:
44- case SQLCOM_DELETE:
45- case SQLCOM_DELETE_MULTI:
46 case SQLCOM_REPLACE:
47 case SQLCOM_REPLACE_SELECT:
48 case SQLCOM_INSERT:
49@@ -4644,7 +4653,7 @@
50 {
51 THD *thd = current_thd;
52 int err = 0;
53- XTThreadPtr self;
54+ XTThreadPtr self = NULL;
55 XTSharePtr share;
56
57 STAT_TRACE(self, *thd_query(thd));
58
59=== modified file 'src/restart_xt.cc'
60--- src/restart_xt.cc 2009-06-03 14:07:54 +0000
61+++ src/restart_xt.cc 2009-06-10 16:27:03 +0000
62@@ -1,3202 +1,3207 @@
63-/* Copyright (c) 2007 PrimeBase Technologies GmbH
64- *
65- * PrimeBase XT
66- *
67- * This program is free software; you can redistribute it and/or modify
68- * it under the terms of the GNU General Public License as published by
69- * the Free Software Foundation; either version 2 of the License, or
70- * (at your option) any later version.
71- *
72- * This program is distributed in the hope that it will be useful,
73- * but WITHOUT ANY WARRANTY; without even the implied warranty of
74- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
75- * GNU General Public License for more details.
76- *
77- * You should have received a copy of the GNU General Public License
78- * along with this program; if not, write to the Free Software
79- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
80- *
81- * 2007-11-12 Paul McCullagh
82- *
83- * H&G2JCtL
84- *
85- * Restart and write data to the database.
86- */
87-
88-#include "xt_config.h"
89-
90-#include <signal.h>
91-#include <time.h>
92-
93-#ifndef DRIZZLED
94-#include "mysql_priv.h"
95-#endif
96-
97-#include "ha_pbxt.h"
98-
99-#include "xactlog_xt.h"
100-#include "database_xt.h"
101-#include "util_xt.h"
102-#include "strutil_xt.h"
103-#include "filesys_xt.h"
104-#include "restart_xt.h"
105-#include "myxt_xt.h"
106-#include "trace_xt.h"
107-
108-#ifdef DEBUG
109-//#define DEBUG_PRINT
110-//#define DEBUG_KEEP_LOGS
111-//#define PRINT_LOG_ON_RECOVERY
112-//#define TRACE_RECORD_DATA
113-//#define SKIP_STARTUP_CHECKPOINT
114-//#define NEVER_CHECKPOINT
115-//#define TRACE_CHECKPOINT
116-#endif
117-
118-#define PRINTF printf
119-//#define PRINTF xt_ftracef
120-//#define PRINTF xt_trace
121-
122-void xt_print_bytes(xtWord1 *buf, u_int len)
123-{
124- for (u_int i=0; i<len; i++) {
125- PRINTF("%02x ", (u_int) *buf);
126- buf++;
127- }
128-}
129-
130-void xt_print_log_record(xtLogID log, xtLogOffset offset, XTXactLogBufferDPtr record)
131-{
132- const char *type = NULL;
133- const char *rec_type = NULL;
134- xtOpSeqNo op_no = 0;
135- xtTableID tab_id = 0;
136- xtRowID row_id = 0;
137- xtRecordID rec_id = 0;
138- xtBool xn_set = FALSE;
139- xtXactID xn_id = 0;
140- char buffer[200];
141- XTTabRecExtDPtr rec_buf;
142- XTTabRecExtDPtr ext_rec;
143- XTTabRecFixDPtr fix_rec;
144- u_int rec_len;
145- xtLogID log_id = 0;
146- xtLogOffset log_offset = 0;
147-
148- rec_buf = NULL;
149- ext_rec = NULL;
150- fix_rec = NULL;
151- rec_len = 0;
152- switch (record->xl.xl_status_1) {
153- case XT_LOG_ENT_REC_MODIFIED:
154- case XT_LOG_ENT_UPDATE:
155- case XT_LOG_ENT_INSERT:
156- case XT_LOG_ENT_DELETE:
157- case XT_LOG_ENT_UPDATE_BG:
158- case XT_LOG_ENT_INSERT_BG:
159- case XT_LOG_ENT_DELETE_BG:
160- op_no = XT_GET_DISK_4(record->xu.xu_op_seq_4);
161- tab_id = XT_GET_DISK_4(record->xu.xu_tab_id_4);
162- rec_id = XT_GET_DISK_4(record->xu.xu_rec_id_4);
163- xn_id = XT_GET_DISK_4(record->xu.xu_xact_id_4);
164- row_id = XT_GET_DISK_4(record->xu.xu_row_id_4);
165- rec_len = XT_GET_DISK_2(record->xu.xu_size_2);
166- xn_set = TRUE;
167- type="rec";
168- rec_buf = (XTTabRecExtDPtr) &record->xu.xu_rec_type_1;
169- ext_rec = (XTTabRecExtDPtr) &record->xu.xu_rec_type_1;
170- if (XT_REC_IS_EXT_DLOG(ext_rec->tr_rec_type_1)) {
171- log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
172- log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
173- }
174- else {
175- ext_rec = NULL;
176- fix_rec = (XTTabRecFixDPtr) &record->xu.xu_rec_type_1;
177- }
178- break;
179- case XT_LOG_ENT_UPDATE_FL:
180- case XT_LOG_ENT_INSERT_FL:
181- case XT_LOG_ENT_DELETE_FL:
182- case XT_LOG_ENT_UPDATE_FL_BG:
183- case XT_LOG_ENT_INSERT_FL_BG:
184- case XT_LOG_ENT_DELETE_FL_BG:
185- op_no = XT_GET_DISK_4(record->xf.xf_op_seq_4);
186- tab_id = XT_GET_DISK_4(record->xf.xf_tab_id_4);
187- rec_id = XT_GET_DISK_4(record->xf.xf_rec_id_4);
188- xn_id = XT_GET_DISK_4(record->xf.xf_xact_id_4);
189- row_id = XT_GET_DISK_4(record->xf.xf_row_id_4);
190- rec_len = XT_GET_DISK_2(record->xf.xf_size_2);
191- xn_set = TRUE;
192- type="rec";
193- rec_buf = (XTTabRecExtDPtr) &record->xf.xf_rec_type_1;
194- ext_rec = (XTTabRecExtDPtr) &record->xf.xf_rec_type_1;
195- if (XT_REC_IS_EXT_DLOG(ext_rec->tr_rec_type_1)) {
196- log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
197- log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
198- }
199- else {
200- ext_rec = NULL;
201- fix_rec = (XTTabRecFixDPtr) &record->xf.xf_rec_type_1;
202- }
203- break;
204- case XT_LOG_ENT_REC_FREED:
205- case XT_LOG_ENT_REC_REMOVED:
206- case XT_LOG_ENT_REC_REMOVED_EXT:
207- op_no = XT_GET_DISK_4(record->fr.fr_op_seq_4);
208- tab_id = XT_GET_DISK_4(record->fr.fr_tab_id_4);
209- rec_id = XT_GET_DISK_4(record->fr.fr_rec_id_4);
210- xn_id = XT_GET_DISK_4(record->fr.fr_xact_id_4);
211- xn_set = TRUE;
212- type="rec";
213- break;
214- case XT_LOG_ENT_REC_REMOVED_BI:
215- op_no = XT_GET_DISK_4(record->rb.rb_op_seq_4);
216- tab_id = XT_GET_DISK_4(record->rb.rb_tab_id_4);
217- rec_id = XT_GET_DISK_4(record->rb.rb_rec_id_4);
218- xn_id = XT_GET_DISK_4(record->rb.rb_xact_id_4);
219- row_id = XT_GET_DISK_4(record->rb.rb_row_id_4);
220- rec_len = XT_GET_DISK_2(record->rb.rb_size_2);
221- xn_set = TRUE;
222- type="rec";
223- rec_buf = (XTTabRecExtDPtr) &record->rb.rb_rec_type_1;
224- ext_rec = (XTTabRecExtDPtr) &record->rb.rb_rec_type_1;
225- if (XT_REC_IS_EXT_DLOG(record->rb.rb_rec_type_1)) {
226- log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
227- log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
228- }
229- else {
230- ext_rec = NULL;
231- fix_rec = (XTTabRecFixDPtr) &record->rb.rb_rec_type_1;
232- }
233- break;
234- case XT_LOG_ENT_REC_MOVED:
235- op_no = XT_GET_DISK_4(record->xw.xw_op_seq_4);
236- tab_id = XT_GET_DISK_4(record->xw.xw_tab_id_4);
237- rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);
238- log_id = XT_GET_DISK_2(&record->xw.xw_rec_type_1); // This is actually correct
239- log_offset = XT_GET_DISK_6(record->xw.xw_next_rec_id_4); // This is actually correct!
240- type="rec";
241- break;
242- case XT_LOG_ENT_REC_CLEANED:
243- case XT_LOG_ENT_REC_CLEANED_1:
244- case XT_LOG_ENT_REC_UNLINKED:
245- op_no = XT_GET_DISK_4(record->xw.xw_op_seq_4);
246- tab_id = XT_GET_DISK_4(record->xw.xw_tab_id_4);
247- rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);
248- type="rec";
249- break;
250- case XT_LOG_ENT_ROW_NEW:
251- case XT_LOG_ENT_ROW_NEW_FL:
252- case XT_LOG_ENT_ROW_ADD_REC:
253- case XT_LOG_ENT_ROW_SET:
254- case XT_LOG_ENT_ROW_FREED:
255- op_no = XT_GET_DISK_4(record->xa.xa_op_seq_4);
256- tab_id = XT_GET_DISK_4(record->xa.xa_tab_id_4);
257- rec_id = XT_GET_DISK_4(record->xa.xa_row_id_4);
258- type="row";
259- break;
260- case XT_LOG_ENT_NO_OP:
261- op_no = XT_GET_DISK_4(record->no.no_op_seq_4);
262- tab_id = XT_GET_DISK_4(record->no.no_tab_id_4);
263- type="-";
264- break;
265- case XT_LOG_ENT_END_OF_LOG:
266- break;
267- }
268-
269- switch (record->xl.xl_status_1) {
270- case XT_LOG_ENT_HEADER:
271- rec_type = "HEADER";
272- break;
273- case XT_LOG_ENT_NEW_LOG:
274- rec_type = "NEW LOG";
275- break;
276- case XT_LOG_ENT_DEL_LOG:
277- sprintf(buffer, "DEL LOG log=%d ", (int) XT_GET_DISK_4(record->xl.xl_log_id_4));
278- rec_type = buffer;
279- break;
280- case XT_LOG_ENT_NEW_TAB:
281- rec_type = "NEW TABLE";
282- break;
283- case XT_LOG_ENT_COMMIT:
284- rec_type = "COMMIT";
285- xn_id = XT_GET_DISK_4(record->xe.xe_xact_id_4);
286- xn_set = TRUE;
287- break;
288- case XT_LOG_ENT_ABORT:
289- rec_type = "ABORT";
290- xn_id = XT_GET_DISK_4(record->xe.xe_xact_id_4);
291- xn_set = TRUE;
292- break;
293- case XT_LOG_ENT_CLEANUP:
294- rec_type = "CLEANUP";
295- xn_id = XT_GET_DISK_4(record->xc.xc_xact_id_4);
296- xn_set = TRUE;
297- break;
298- case XT_LOG_ENT_REC_MODIFIED:
299- rec_type = "MODIFIED";
300- break;
301- case XT_LOG_ENT_UPDATE:
302- rec_type = "UPDATE";
303- break;
304- case XT_LOG_ENT_UPDATE_FL:
305- rec_type = "UPDATE-FL";
306- break;
307- case XT_LOG_ENT_INSERT:
308- rec_type = "INSERT";
309- break;
310- case XT_LOG_ENT_INSERT_FL:
311- rec_type = "INSERT-FL";
312- break;
313- case XT_LOG_ENT_DELETE:
314- rec_type = "DELETE";
315- break;
316- case XT_LOG_ENT_DELETE_FL:
317- rec_type = "DELETE-FL-BG";
318- break;
319- case XT_LOG_ENT_UPDATE_BG:
320- rec_type = "UPDATE-BG";
321- break;
322- case XT_LOG_ENT_UPDATE_FL_BG:
323- rec_type = "UPDATE-FL-BG";
324- break;
325- case XT_LOG_ENT_INSERT_BG:
326- rec_type = "INSERT-BG";
327- break;
328- case XT_LOG_ENT_INSERT_FL_BG:
329- rec_type = "INSERT-FL-BG";
330- break;
331- case XT_LOG_ENT_DELETE_BG:
332- rec_type = "DELETE-BG";
333- break;
334- case XT_LOG_ENT_DELETE_FL_BG:
335- rec_type = "DELETE-FL-BG";
336- break;
337- case XT_LOG_ENT_REC_FREED:
338- rec_type = "FREE REC";
339- break;
340- case XT_LOG_ENT_REC_REMOVED:
341- rec_type = "REMOVED REC";
342- break;
343- case XT_LOG_ENT_REC_REMOVED_EXT:
344- rec_type = "REMOVED-X REC";
345- break;
346- case XT_LOG_ENT_REC_REMOVED_BI:
347- rec_type = "REMOVED-BI REC";
348- break;
349- case XT_LOG_ENT_REC_MOVED:
350- rec_type = "MOVED REC";
351- break;
352- case XT_LOG_ENT_REC_CLEANED:
353- rec_type = "CLEAN REC";
354- break;
355- case XT_LOG_ENT_REC_CLEANED_1:
356- rec_type = "CLEAN REC-1";
357- break;
358- case XT_LOG_ENT_REC_UNLINKED:
359- rec_type = "UNLINK REC";
360- break;
361- case XT_LOG_ENT_ROW_NEW:
362- rec_type = "NEW ROW";
363- break;
364- case XT_LOG_ENT_ROW_NEW_FL:
365- rec_type = "NEW ROW-FL";
366- break;
367- case XT_LOG_ENT_ROW_ADD_REC:
368- rec_type = "REC ADD ROW";
369- break;
370- case XT_LOG_ENT_ROW_SET:
371- rec_type = "SET ROW";
372- break;
373- case XT_LOG_ENT_ROW_FREED:
374- rec_type = "FREE ROW";
375- break;
376- case XT_LOG_ENT_OP_SYNC:
377- rec_type = "OP SYNC";
378- break;
379- case XT_LOG_ENT_NO_OP:
380- rec_type = "NO OP";
381- break;
382- case XT_LOG_ENT_END_OF_LOG:
383- rec_type = "END OF LOG";
384- break;
385- }
386-
387- if (log)
388- PRINTF("log=%d offset=%d ", (int) log, (int) offset);
389- PRINTF("%s ", rec_type);
390- if (type)
391- PRINTF("op=%lu tab=%lu %s=%lu ", (u_long) op_no, (u_long) tab_id, type, (u_long) rec_id);
392- if (row_id)
393- PRINTF("row=%lu ", (u_long) row_id);
394- if (log_id)
395- PRINTF("log=%lu offset=%lu ", (u_long) log_id, (u_long) log_offset);
396- if (xn_set)
397- PRINTF("xact=%lu ", (u_long) xn_id);
398-
399-#ifdef TRACE_RECORD_DATA
400- if (rec_buf) {
401- switch (rec_buf->tr_rec_type_1 & XT_TAB_STATUS_MASK) {
402- case XT_TAB_STATUS_FREED:
403- PRINTF("FREE");
404- break;
405- case XT_TAB_STATUS_DELETE:
406- PRINTF("DELE");
407- break;
408- case XT_TAB_STATUS_FIXED:
409- PRINTF("FIX-");
410- break;
411- case XT_TAB_STATUS_VARIABLE:
412- PRINTF("VAR-");
413- break;
414- case XT_TAB_STATUS_EXT_DLOG:
415- PRINTF("EXT-");
416- break;
417- }
418- if (rec_buf->tr_rec_type_1 & XT_TAB_STATUS_CLEANED_BIT)
419- PRINTF("C");
420- else
421- PRINTF(" ");
422- }
423- if (ext_rec) {
424- rec_len -= offsetof(XTTabRecExtDRec, re_data);
425- xt_print_bytes((xtWord1 *) ext_rec, offsetof(XTTabRecExtDRec, re_data));
426- PRINTF("| ");
427- if (rec_len > 20)
428- rec_len = 20;
429- xt_print_bytes(ext_rec->re_data, rec_len);
430- }
431- if (fix_rec) {
432- rec_len -= offsetof(XTTabRecFixDRec, rf_data);
433- xt_print_bytes((xtWord1 *) fix_rec, offsetof(XTTabRecFixDRec, rf_data));
434- PRINTF("| ");
435- if (rec_len > 20)
436- rec_len = 20;
437- xt_print_bytes(fix_rec->rf_data, rec_len);
438- }
439-#endif
440-
441- PRINTF("\n");
442-}
443-
444-#ifdef DEBUG_PRINT
445-void check_rows(void)
446-{
447- static XTOpenFilePtr of = NULL;
448-
449- if (!of)
450- of = xt_open_file_ns("./test/test_tab-1.xtr", XT_FS_DEFAULT);
451- if (of) {
452- size_t size = (size_t) xt_seek_eof_file(NULL, of);
453- xtWord8 *buffer = (xtWord8 *) xt_malloc_ns(size);
454- xt_pread_file(of, 0, size, size, buffer, NULL);
455- for (size_t i=0; i<size/8; i++) {
456- if (!buffer[i])
457- printf("%d is NULL\n", (int) i);
458- }
459- }
460-}
461-
462-#endif
463-
464-/* ----------------------------------------------------------------------
465- * APPLYING CHANGES IN SEQUENCE
466- */
467-
468-typedef struct XTOperation {
469- xtOpSeqNo or_op_seq;
470- xtWord4 or_op_len;
471- xtLogID or_log_id;
472- xtLogOffset or_log_offset;
473-} XTOperationRec, *XTOperationPtr;
474-
475-static int xres_cmp_op_seq(struct XTThread *XT_UNUSED(self), register const void *XT_UNUSED(thunk), register const void *a, register const void *b)
476-{
477- xtOpSeqNo lf_op_seq = *((xtOpSeqNo *) a);
478- XTOperationPtr lf_ptr = (XTOperationPtr) b;
479-
480- if (lf_op_seq == lf_ptr->or_op_seq)
481- return 0;
482- if (XTTableSeq::xt_op_is_before(lf_op_seq, lf_ptr->or_op_seq))
483- return -1;
484- return 1;
485-}
486-
487-xtPublic void xt_xres_init_tab(XTThreadPtr self, XTTableHPtr tab)
488-{
489- tab->tab_op_list = xt_new_sortedlist(self, sizeof(XTOperationRec), 20, 1000, xres_cmp_op_seq, NULL, NULL, TRUE, FALSE);
490-}
491-
492-xtPublic void xt_xres_exit_tab(XTThreadPtr self, XTTableHPtr tab)
493-{
494- if (tab->tab_op_list) {
495- xt_free_sortedlist(self, tab->tab_op_list);
496- tab->tab_op_list = NULL;
497- }
498-}
499-
500-static xtBool xres_open_table(XTThreadPtr self, XTWriterStatePtr ws, xtTableID tab_id)
501-{
502- XTOpenTablePtr ot;
503-
504- if ((ot = ws->ws_ot)) {
505- if (ot->ot_table->tab_id == tab_id)
506- return OK;
507- xt_db_return_table_to_pool(self, ot);
508- ws->ws_ot = NULL;
509- }
510-
511- if (ws->ws_tab_gone == tab_id)
512- return FAILED;
513- if ((ws->ws_ot = xt_db_open_pool_table(self, ws->ws_db, tab_id, NULL, TRUE))) {
514- XTTableHPtr tab;
515-
516- tab = ws->ws_ot->ot_table;
517- if (!tab->tab_ind_rec_log_id) {
518- /* Should not happen... */
519- tab->tab_ind_rec_log_id = ws->ws_ind_rec_log_id;
520- tab->tab_ind_rec_log_offset = ws->ws_ind_rec_log_offset;
521- }
522- return OK;
523- }
524- ws->ws_tab_gone = tab_id;
525- return FAILED;
526-}
527-
528-/* {INDEX-RECOV_ROWID}
529- * Add missing index entries during recovery.
530- * Set the row ID even if the index entry
531- * is not committed. It will be removed later by
532- * the sweeper.
533- */
534-static xtBool xres_add_index_entries(XTOpenTablePtr ot, xtRowID row_id, xtRecordID rec_id, xtWord1 *rec_data)
535-{
536- XTTableHPtr tab = ot->ot_table;
537- u_int idx_cnt;
538- XTIndexPtr *ind;
539- //XTIdxSearchKeyRec key;
540-
541- if (tab->tab_dic.dic_disable_index)
542- return OK;
543-
544- for (idx_cnt=0, ind=tab->tab_dic.dic_keys; idx_cnt<tab->tab_dic.dic_key_count; idx_cnt++, ind++) {
545- if (!xt_idx_insert(ot, *ind, row_id, rec_id, rec_data, NULL, TRUE)) {
546- /* Check the error, certain errors are recoverable! */
547- XTThreadPtr self = xt_get_self();
548-
549- if (self->t_exception.e_xt_err == XT_SYSTEM_ERROR &&
550- (XT_FILE_IN_USE(self->t_exception.e_sys_err) ||
551- XT_FILE_ACCESS_DENIED(self->t_exception.e_sys_err) ||
552- XT_FILE_TOO_MANY_OPEN(self->t_exception.e_sys_err) ||
553- self->t_exception.e_sys_err == XT_ENOMEM)) {
554- ot->ot_err_index_no = (*ind)->mi_index_no;
555- return FAILED;
556- }
557-
558- /* TODO: Write something to the index header to indicate that
559- * it is corrupted.
560- */
561- tab->tab_dic.dic_disable_index = XT_INDEX_CORRUPTED;
562- xt_log_and_clear_exception_ns();
563- return OK;
564- }
565- }
566- return OK;
567-}
568-
569-static void xres_remove_index_entries(XTOpenTablePtr ot, xtRecordID rec_id, xtWord1 *rec_data)
570-{
571- XTTableHPtr tab = ot->ot_table;
572- u_int idx_cnt;
573- XTIndexPtr *ind;
574-
575- if (tab->tab_dic.dic_disable_index)
576- return;
577-
578- for (idx_cnt=0, ind=tab->tab_dic.dic_keys; idx_cnt<tab->tab_dic.dic_key_count; idx_cnt++, ind++) {
579- if (!xt_idx_delete(ot, *ind, rec_id, rec_data))
580- xt_log_and_clear_exception_ns();
581- }
582-}
583-
584-static xtWord1 *xres_load_record(XTThreadPtr self, XTOpenTablePtr ot, xtRecordID rec_id, xtWord1 *data, size_t red_size, XTInfoBufferPtr rec_buf, u_int cols_req)
585-{
586- XTTableHPtr tab = ot->ot_table;
587- xtWord1 *rec_data;
588-
589- rec_data = ot->ot_row_rbuffer;
590-
591- ASSERT(red_size <= ot->ot_row_rbuf_size);
592- ASSERT(tab->tab_dic.dic_rec_size <= ot->ot_row_rbuf_size);
593- if (data) {
594- if (rec_data != data)
595- memcpy(rec_data, data, red_size);
596- }
597- else {
598- /* It can be that less than 'dic_rec_size' was written for
599- * variable length type records.
600- * If this is the last record in the file, then we will read
601- * less than actual record size.
602- */
603- if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), tab->tab_dic.dic_rec_size, 0, rec_data, &red_size, &self->st_statistics.st_rec, self))
604- goto failed;
605-
606- if (red_size < sizeof(XTTabRecHeadDRec))
607- return NULL;
608- }
609-
610- if (XT_REC_IS_FIXED(rec_data[0]))
611- rec_data = ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE;
612- else {
613- if (!xt_ib_alloc(NULL, rec_buf, tab->tab_dic.dic_mysql_buf_size))
614- goto failed;
615- if (XT_REC_IS_VARIABLE(rec_data[0])) {
616- if (!myxt_load_row(ot, rec_data + XT_REC_FIX_HEADER_SIZE, rec_buf->ib_db.db_data, cols_req))
617- goto failed;
618- }
619- else if (XT_REC_IS_EXT_DLOG(rec_data[0])) {
620- if (red_size < XT_REC_EXT_HEADER_SIZE)
621- return NULL;
622-
623- ASSERT(cols_req);
624- if (cols_req && cols_req <= tab->tab_dic.dic_fix_col_count) {
625- if (!myxt_load_row(ot, rec_data + XT_REC_EXT_HEADER_SIZE, rec_buf->ib_db.db_data, cols_req))
626- goto failed;
627- }
628- else {
629- if (!xt_tab_load_ext_data(ot, rec_id, rec_buf->ib_db.db_data, cols_req))
630- goto failed;
631- }
632- }
633- else
634- /* This is possible, the record has already been cleaned up. */
635- return NULL;
636- rec_data = rec_buf->ib_db.db_data;
637- }
638-
639- return rec_data;
640-
641- failed:
642- /* Running out of memory should not be ignored. */
643- if (self->t_exception.e_xt_err == XT_SYSTEM_ERROR &&
644- self->t_exception.e_sys_err == XT_ENOMEM)
645- xt_throw(self);
646- xt_log_and_clear_exception_ns();
647- return NULL;
648-}
649-
650-/*
651- * Apply a change from the log.
652- *
653- * This function is basically very straight forward, were it not
654- * for the option to apply operations out of sequence.
655- * (i.e. in_sequence == FALSE)
656- *
657- * If operations are applied in sequence, then they can be
658- * applied blindly. The update operation is just executed as
659- * it was logged.
660- *
661- * If the changes are not in sequence, then some operation are missing,
662- * however, the operations that are present are in the correct order.
663- *
664- * This can only happen at the end of recovery!!!
665- * After we have applied all operations in the log we may be
666- * left with some operations that have not been applied
667- * because operations were logged out of sequence.
668- *
669- * The application of these operations there has to take into
670- * account the current state of the database.
671- * They are then applied in a manner that maintains the
672- * database consistency.
673- *
674- * For example, a record that is freed, is free by placing it
675- * on the current free list. Part of the data logged for the
676- * operation is ignored. Namely: the "next block" pointer
677- * that was originally written into the freed record.
678- */
679-static void xres_apply_change(XTThreadPtr self, XTOpenTablePtr ot, XTXactLogBufferDPtr record, xtBool in_sequence, xtBool check_index, XTInfoBufferPtr rec_buf)
680-{
681- XTTableHPtr tab = ot->ot_table;
682- size_t len;
683- xtRecordID rec_id;
684- xtRefID free_ref_id;
685- XTTabRecFreeDRec free_rec;
686- xtRowID row_id;
687- XTTabRowRefDRec row_buf;
688- XTTabRecHeadDRec rec_head;
689- size_t tfer;
690- xtRecordID link_rec_id, prev_link_rec_id;
691- xtWord1 *rec_data = NULL;
692- XTTabRecFreeDPtr free_data;
693-
694- switch (record->xl.xl_status_1) {
695- case XT_LOG_ENT_REC_MODIFIED:
696- case XT_LOG_ENT_UPDATE:
697- case XT_LOG_ENT_INSERT:
698- case XT_LOG_ENT_DELETE:
699- case XT_LOG_ENT_UPDATE_BG:
700- case XT_LOG_ENT_INSERT_BG:
701- case XT_LOG_ENT_DELETE_BG:
702- rec_id = XT_GET_DISK_4(record->xu.xu_rec_id_4);
703- len = (size_t) XT_GET_DISK_2(record->xu.xu_size_2);
704- if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), len, (xtWord1 *) &record->xu.xu_rec_type_1, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
705- xt_throw(self);
706- tab->tab_bytes_to_flush += len;
707-
708- if (check_index && ot->ot_table->tab_dic.dic_key_count) {
709- switch (record->xl.xl_status_1) {
710- case XT_LOG_ENT_DELETE:
711- case XT_LOG_ENT_DELETE_BG:
712- break;
713- case XT_LOG_ENT_REC_MODIFIED:
714- if ((rec_data = xres_load_record(self, ot, rec_id, NULL, 0, rec_buf, tab->tab_dic.dic_ind_cols_req)))
715- xres_remove_index_entries(ot, rec_id, rec_data);
716- /* No break required: */
717- default:
718- if ((rec_data = xres_load_record(self, ot, rec_id, &record->xu.xu_rec_type_1, len, rec_buf, tab->tab_dic.dic_ind_cols_req))) {
719- row_id = XT_GET_DISK_4(record->xu.xu_row_id_4);
720- if (!xres_add_index_entries(ot, row_id, rec_id, rec_data))
721- xt_throw(self);
722- }
723- break;
724- }
725- }
726-
727- if (!in_sequence) {
728- /* A record has been allocated from the EOF, but out of sequence.
729- * This could leave a gap where other records were allocated
730- * from the EOF, but those operations have been lost!
731- * We compensate for this by adding all blocks between
732- * to the free list.
733- */
734- free_rec.rf_rec_type_1 = XT_TAB_STATUS_FREED;
735- free_rec.rf_not_used_1 = 0;
736- while (tab->tab_head_rec_eof_id < rec_id) {
737- XT_SET_DISK_4(free_rec.rf_next_rec_id_4, tab->tab_head_rec_free_id);
738- if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, tab->tab_head_rec_eof_id, sizeof(XTTabRecFreeDRec), (xtWord1 *) &free_rec, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
739- xt_throw(self);
740- tab->tab_bytes_to_flush += sizeof(XTTabRecFreeDRec);
741- tab->tab_head_rec_free_id = tab->tab_head_rec_eof_id;
742- tab->tab_head_rec_eof_id++;
743- }
744- }
745- if (tab->tab_head_rec_eof_id < rec_id + 1)
746- tab->tab_head_rec_eof_id = rec_id + 1;
747- tab->tab_flush_pending = TRUE;
748- break;
749- case XT_LOG_ENT_UPDATE_FL:
750- case XT_LOG_ENT_INSERT_FL:
751- case XT_LOG_ENT_DELETE_FL:
752- case XT_LOG_ENT_UPDATE_FL_BG:
753- case XT_LOG_ENT_INSERT_FL_BG:
754- case XT_LOG_ENT_DELETE_FL_BG:
755- rec_id = XT_GET_DISK_4(record->xf.xf_rec_id_4);
756- len = (size_t) XT_GET_DISK_2(record->xf.xf_size_2);
757- free_ref_id = XT_GET_DISK_4(record->xf.xf_free_rec_id_4);
758-
759- if (check_index &&
760- record->xf.xf_status_1 != XT_LOG_ENT_DELETE_FL &&
761- record->xf.xf_status_1 != XT_LOG_ENT_DELETE_FL_BG) {
762- if ((rec_data = xres_load_record(self, ot, rec_id, &record->xf.xf_rec_type_1, len, rec_buf, tab->tab_dic.dic_ind_cols_req))) {
763- row_id = XT_GET_DISK_4(record->xf.xf_row_id_4);
764- if (!xres_add_index_entries(ot, row_id, rec_id, rec_data))
765- xt_throw(self);
766- }
767- }
768-
769- if (!in_sequence) {
770- /* This record was allocated from the free list.
771- * Because this operation is out of sequence, there
772- * could have been other allocations from the
773- * free list before this, that have gone missing.
774- * For this reason we have to search the current
775- * free list and remove the record.
776- */
777- link_rec_id = tab->tab_head_rec_free_id;
778- prev_link_rec_id = 0;
779- while (link_rec_id) {
780- if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, link_rec_id), sizeof(XTTabRecFreeDRec), sizeof(XTTabRecFreeDRec), (xtWord1 *) &free_rec, NULL, &self->st_statistics.st_rec, self))
781- xt_throw(self);
782- if (link_rec_id == rec_id)
783- break;
784- prev_link_rec_id = link_rec_id;
785- link_rec_id = XT_GET_DISK_4(free_rec.rf_next_rec_id_4);
786- }
787- if (link_rec_id == rec_id) {
788- /* The block was found on the free list.
789- * remove it: */
790- if (prev_link_rec_id) {
791- /* We write the record from position 'link_rec_id' into
792- * position 'prev_link_rec_id'. This unlinks 'link_rec_id'!
793- */
794- if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, prev_link_rec_id), sizeof(XTTabRecFreeDRec), (xtWord1 *) &free_rec, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
795- xt_throw(self);
796- tab->tab_bytes_to_flush += sizeof(XTTabRecFreeDRec);
797- free_ref_id = tab->tab_head_rec_free_id;
798- }
799- else
800- /* The block is at the front of the list: */
801- free_ref_id = XT_GET_DISK_4(free_rec.rf_next_rec_id_4);
802- }
803- else {
804- /* Not found on the free list? */
805- if (tab->tab_head_rec_eof_id < rec_id + 1)
806- tab->tab_head_rec_eof_id = rec_id + 1;
807- goto write_mod_data;
808- }
809- }
810- if (tab->tab_head_rec_eof_id < rec_id + 1)
811- tab->tab_head_rec_eof_id = rec_id + 1;
812- tab->tab_head_rec_free_id = free_ref_id;
813- tab->tab_head_rec_fnum--;
814- write_mod_data:
815- if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), len, (xtWord1 *) &record->xf.xf_rec_type_1, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
816- xt_throw(self);
817- tab->tab_bytes_to_flush += len;
818- tab->tab_flush_pending = TRUE;
819- break;
820- case XT_LOG_ENT_REC_REMOVED:
821- case XT_LOG_ENT_REC_REMOVED_EXT: {
822- xtBool record_loaded;
823- XTTabRecExtDPtr ext_rec;
824- size_t red_size;
825- xtWord4 log_over_size = 0;
826- xtLogID data_log_id = 0;
827- xtLogOffset data_log_offset = 0;
828- u_int cols_required = 0;
829-
830- rec_id = XT_GET_DISK_4(record->fr.fr_rec_id_4);
831- free_data = (XTTabRecFreeDPtr) &record->fr.fr_rec_type_1;
832-
833- /* This is a short-cut, it does not require loading the record: */
834- if (!check_index && !tab->tab_dic.dic_blob_count && record->fr.fr_status_1 != XT_LOG_ENT_REC_REMOVED_EXT)
835- goto do_rec_freed;
836-
837- ext_rec = (XTTabRecExtDPtr) ot->ot_row_rbuffer;
838-
839- if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), tab->tab_dic.dic_rec_size, 0, ext_rec, &red_size, &self->st_statistics.st_rec, self)) {
840- xt_log_and_clear_exception_ns();
841- goto do_rec_freed;
842- }
843-
844- if (red_size < sizeof(XTTabRecHeadDRec))
845- goto do_rec_freed;
846-
847- /* Check that the record is the same as the one originally removed.
848- * This can be different if recovery is repeated.
849- * For example:
850- *
851- * log=21 offset=6304472 REMOVED-X REC op=360616 tab=7 rec=25874
852- * log=21 offset=6309230 UPDATE-FL op=360618 tab=7 rec=25874 row=26667 log=1 offset=26503077 xact=209
853- * log=21 offset=6317500 CLEAN REC op=360631 tab=7 rec=25874
854- *
855- * If this recovery sequence is repeated, then the REMOVED-X will free the
856- * extended record belonging to the update that came afterwards!
857- *
858- * Additional situation to consider:
859- *
860- * - A record "x" is created, and index entries created.
861- * - A checkpoint is made done.
862- * - Record "x" is deleted due to UPDATE.
863- * - The index entries are removed, but the index is not
864- * flushed.
865- * - This deletion is written to disk by the writer.
866- * So we have the situation that the remove is on disk,
867- * but the index changes have not been made.
868- *
869- * In this case, skipping to "do_rec_freed" is incorrect.
870- */
871- if (record->fr.fr_stat_id_1 != ext_rec->tr_stat_id_1 ||
872- XT_GET_DISK_4(record->fr.fr_xact_id_4) != XT_GET_DISK_4(ext_rec->tr_xact_id_4))
873- goto dont_remove_x_record;
874-
875- if (record->xl.xl_status_1 == XT_LOG_ENT_REC_REMOVED_EXT) {
876- if (!XT_REC_IS_EXT_DLOG(ext_rec->tr_rec_type_1))
877- goto dont_remove_x_record;
878- if (red_size < offsetof(XTTabRecExtDRec, re_data))
879- goto dont_remove_x_record;
880-
881- /* Save this for later (can be overwritten by xres_load_record(): */
882- data_log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
883- data_log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
884- log_over_size = XT_GET_DISK_4(ext_rec->re_log_dat_siz_4);
885- }
886- dont_remove_x_record:
887-
888- record_loaded = FALSE;
889-
890- if (check_index) {
891- cols_required = tab->tab_dic.dic_ind_cols_req;
892- if (tab->tab_dic.dic_blob_cols_req > cols_required)
893- cols_required = tab->tab_dic.dic_blob_cols_req;
894- if (!(rec_data = xres_load_record(self, ot, rec_id, ot->ot_row_rbuffer, red_size, rec_buf, cols_required)))
895- goto do_rec_freed;
896- record_loaded = TRUE;
897- xres_remove_index_entries(ot, rec_id, rec_data);
898- }
899-
900- if (tab->tab_dic.dic_blob_count) {
901- if (!record_loaded) {
902- if (tab->tab_dic.dic_blob_cols_req > cols_required)
903- cols_required = tab->tab_dic.dic_blob_cols_req;
904- if (!(rec_data = xres_load_record(self, ot, rec_id, ot->ot_row_rbuffer, red_size, rec_buf, cols_required)))
905- /* [(7)] REMOVE is followed by FREE:
906- goto get_rec_offset;
907- */
908- goto do_rec_freed;
909- record_loaded = TRUE;
910- }
911-#ifdef XT_STREAMING
912- myxt_release_blobs(ot, rec_data, rec_id);
913-#endif
914- }
915-
916- if (record->xl.xl_status_1 == XT_LOG_ENT_REC_REMOVED_EXT) {
917- /* Note: dlb_delete_log() may be repeated, but should handle this:
918- *
919- * Example:
920- * log=5 offset=213334 CLEAN REC op=28175 tab=1 rec=317428
921- * ...
922- * log=6 offset=321063 REMOVED-X REC op=33878 tab=1 rec=317428
923- *
924- * When this sequence is repeated during recovery, then CLEAN REC
925- * will reset the status byte of the record so that it
926- * comes back to here!
927- *
928- * The check for zero is probably not required here.
929- */
930- if (data_log_id && data_log_offset && log_over_size) {
931- if (!ot->ot_thread->st_dlog_buf.dlb_delete_log(data_log_id, data_log_offset, log_over_size, tab->tab_id, rec_id, self)) {
932- if (ot->ot_thread->t_exception.e_xt_err != XT_ERR_BAD_EXT_RECORD &&
933- ot->ot_thread->t_exception.e_xt_err != XT_ERR_DATA_LOG_NOT_FOUND)
934- xt_log_and_clear_exception_ns();
935- }
936- }
937- }
938-
939- goto do_rec_freed;
940- }
941- case XT_LOG_ENT_REC_REMOVED_BI: {
942- /*
943- * For deletion we need the complete before image because of the following problem.
944- *
945- * DROP TABLE IF EXISTS t1;
946- * CREATE TABLE t1 (ID int primary key auto_increment, value int, index (value)) engine=pbxt;
947- *
948- * insert t1(value) values(50);
949- *
950- * -- CHECKPOINT --
951- *
952- * update t1 set value = 60;
953- *
954- * -- PAUSE --
955- *
956- * update t1 set value = 70;
957- *
958- * -- CRASH --
959- *
960- * select value from t1;
961- * select * from t1;
962- *
963- * 081203 12:11:46 [Note] PBXT: Recovering from 1-148, bytes to read: 33554284
964- * log=1 offset=148 UPDATE-BG op=5 tab=1 rec=2 row=1 xact=3
965- * log=1 offset=188 REC ADD ROW op=6 tab=1 row=1
966- * log=1 offset=206 COMMIT xact=3
967- * log=1 offset=216 REMOVED REC op=7 tab=1 rec=1 xact=2
968- * log=1 offset=241 CLEAN REC op=8 tab=1 rec=2
969- * log=1 offset=261 CLEANUP xact=3
970- * log=1 offset=267 UPDATE-FL-BG op=9 tab=1 rec=1 row=1 xact=4
971- * log=1 offset=311 REC ADD ROW op=10 tab=1 row=1
972- * log=1 offset=329 COMMIT xact=4
973- * log=1 offset=339 REMOVED REC op=11 tab=1 rec=2 xact=3
974- * log=1 offset=364 CLEAN REC op=12 tab=1 rec=1
975- * log=1 offset=384 CLEANUP xact=4
976- * 081203 12:12:15 [Note] PBXT: Recovering complete at 1-390, bytes read: 33554284
977- *
978- * mysql> select value from t1;
979- * +-------+
980- * | value |
981- * +-------+
982- * | 50 |
983- * | 70 |
984- * +-------+
985- * 2 rows in set (55.99 sec)
986- *
987- * mysql> select * from t1;
988- * +----+-------+
989- * | ID | value |
990- * +----+-------+
991- * | 1 | 70 |
992- * +----+-------+
993- * 1 row in set (0.00 sec)
994- */
995- XTTabRecExtDPtr ext_rec;
996- xtWord4 log_over_size = 0;
997- xtLogID data_log_id = 0;
998- xtLogOffset data_log_offset = 0;
999- u_int cols_required = 0;
1000- xtBool record_loaded;
1001- size_t rec_size;
1002-
1003- rec_id = XT_GET_DISK_4(record->rb.rb_rec_id_4);
1004- rec_size = XT_GET_DISK_2(record->rb.rb_size_2);
1005-
1006- ext_rec = (XTTabRecExtDPtr) &record->rb.rb_rec_type_1;
1007-
1008- if (XT_REC_IS_EXT_DLOG(record->rb.rb_rec_type_1)) {
1009- /* Save this for later (can be overwritten by xres_load_record(): */
1010- data_log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
1011- data_log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
1012- log_over_size = XT_GET_DISK_4(ext_rec->re_log_dat_siz_4);
1013- }
1014-
1015- record_loaded = FALSE;
1016-
1017- if (check_index) {
1018- cols_required = tab->tab_dic.dic_ind_cols_req;
1019-#ifdef XT_STREAMING
1020- if (tab->tab_dic.dic_blob_cols_req > cols_required)
1021- cols_required = tab->tab_dic.dic_blob_cols_req;
1022-#endif
1023- if (!(rec_data = xres_load_record(self, ot, rec_id, &record->rb.rb_rec_type_1, rec_size, rec_buf, cols_required)))
1024- goto go_on_to_free;
1025- record_loaded = TRUE;
1026- xres_remove_index_entries(ot, rec_id, rec_data);
1027- }
1028-
1029-#ifdef XT_STREAMING
1030- if (tab->tab_dic.dic_blob_count) {
1031- if (!record_loaded) {
1032- cols_required = tab->tab_dic.dic_blob_cols_req;
1033- if (!(rec_data = xres_load_record(self, ot, rec_id, &record->rb.rb_rec_type_1, rec_size, rec_buf, cols_required)))
1034- /* [(7)] REMOVE is followed by FREE:
1035- goto get_rec_offset;
1036- */
1037- goto go_on_to_free;
1038- record_loaded = TRUE;
1039- }
1040- myxt_release_blobs(ot, rec_data, rec_id);
1041- }
1042-#endif
1043-
1044- if (data_log_id && data_log_offset && log_over_size) {
1045- if (!ot->ot_thread->st_dlog_buf.dlb_delete_log(data_log_id, data_log_offset, log_over_size, tab->tab_id, rec_id, self)) {
1046- if (ot->ot_thread->t_exception.e_xt_err != XT_ERR_BAD_EXT_RECORD &&
1047- ot->ot_thread->t_exception.e_xt_err != XT_ERR_DATA_LOG_NOT_FOUND)
1048- xt_log_and_clear_exception_ns();
1049- }
1050- }
1051-
1052- go_on_to_free:
1053- /* Use the new record type: */
1054- record->rb.rb_rec_type_1 = record->rb.rb_new_rec_type_1;
1055- free_data = (XTTabRecFreeDPtr) &record->rb.rb_rec_type_1;
1056- goto do_rec_freed;
1057- }
1058- case XT_LOG_ENT_REC_FREED:
1059- rec_id = XT_GET_DISK_4(record->fr.fr_rec_id_4);
1060- free_data = (XTTabRecFreeDPtr) &record->fr.fr_rec_type_1;
1061- do_rec_freed:
1062- if (!in_sequence) {
1063- size_t red_size;
1064-
1065- /* Free the record.
1066- * We place the record on front of the current
1067- * free list.
1068- *
1069- * However, before we do this, we remove the record
1070- * from its row list, if the record is on a row list.
1071- *
1072- * We do this here, because in the normal removal
1073- * from the row list uses the operations:
1074- *
1075- * XT_LOG_ENT_REC_UNLINKED, XT_LOG_ENT_ROW_SET and
1076- * XT_LOG_ENT_ROW_FREED.
1077- *
1078- * When operations are performed out of sequence,
1079- * these operations are ignored for the purpose
1080- * of removing the record from the row.
1081- */
1082- if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), sizeof(XTTabRecHeadDRec), sizeof(XTTabRecHeadDRec), (xtWord1 *) &rec_head, NULL, &self->st_statistics.st_rec, self))
1083- xt_throw(self);
1084- /* The record is already free: */
1085- if (XT_REC_IS_FREE(rec_head.tr_rec_type_1))
1086- goto free_done;
1087- row_id = XT_GET_DISK_4(rec_head.tr_row_id_4);
1088-
1089- /* Search the row for this record: */
1090- if (!XT_PREAD_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, row_id), sizeof(XTTabRowRefDRec), sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf, NULL, &self->st_statistics.st_rec, self))
1091- xt_throw(self);
1092- link_rec_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);
1093- prev_link_rec_id = 0;
1094- while (link_rec_id) {
1095- if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, link_rec_id), sizeof(XTTabRecHeadDRec), 0, (xtWord1 *) &rec_head, &red_size, &self->st_statistics.st_rec, self)) {
1096- xt_log_and_clear_exception(self);
1097- break;
1098- }
1099- if (red_size < sizeof(XTTabRecHeadDRec))
1100- break;
1101- if (link_rec_id == rec_id)
1102- break;
1103- if (XT_GET_DISK_4(rec_head.tr_row_id_4) != row_id)
1104- break;
1105- switch (rec_head.tr_rec_type_1 & XT_TAB_STATUS_MASK) {
1106- case XT_TAB_STATUS_FREED:
1107- break;
1108- case XT_TAB_STATUS_DELETE:
1109- case XT_TAB_STATUS_FIXED:
1110- case XT_TAB_STATUS_VARIABLE:
1111- case XT_TAB_STATUS_EXT_DLOG:
1112- break;
1113- default:
1114- ASSERT(FALSE);
1115- goto exit_loop;
1116- }
1117- if (rec_head.tr_rec_type_1 & ~(XT_TAB_STATUS_CLEANED_BIT | XT_TAB_STATUS_MASK)) {
1118- ASSERT(FALSE);
1119- break;
1120- }
1121- prev_link_rec_id = link_rec_id;
1122- link_rec_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
1123- }
1124-
1125- exit_loop:
1126- if (link_rec_id == rec_id) {
1127- /* The record was found on the row list, remove it: */
1128- if (prev_link_rec_id) {
1129- /* We write the previous variation pointer from position 'link_rec_id' into
1130- * variation pointer of the 'prev_link_rec_id' record. This unlinks 'link_rec_id'!
1131- */
1132- if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, prev_link_rec_id) + offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4), XT_RECORD_ID_SIZE, (xtWord1 *) &rec_head.tr_prev_rec_id_4, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
1133- xt_throw(self);
1134- tab->tab_bytes_to_flush += XT_RECORD_ID_SIZE;
1135- }
1136- else {
1137- /* The record is at the front of the row list: */
1138- xtRefID ref_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
1139- XT_SET_DISK_4(row_buf.rr_ref_id_4, ref_id);
1140- if (!XT_PWRITE_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, row_id), sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
1141- xt_throw(self);
1142- tab->tab_bytes_to_flush += sizeof(XTTabRowRefDRec);
1143- }
1144- }
1145-
1146- /* Now we free the record, by placing it at the front of
1147- * the free list:
1148- */
1149- XT_SET_DISK_4(free_data->rf_next_rec_id_4, tab->tab_head_rec_free_id);
1150- }
1151- tab->tab_head_rec_free_id = rec_id;
1152- tab->tab_head_rec_fnum++;
1153- if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), sizeof(XTTabRecFreeDRec), (xtWord1 *) free_data, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
1154- xt_throw(self);
1155- tab->tab_bytes_to_flush += sizeof(XTTabRecFreeDRec);
1156- tab->tab_flush_pending = TRUE;
1157- free_done:
1158- break;
1159- case XT_LOG_ENT_REC_MOVED:
1160- len = 8;
1161- rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);
1162- if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id) + offsetof(XTTabRecExtDRec, re_log_id_2), len, (xtWord1 *) &record->xw.xw_rec_type_1, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
1163- xt_throw(self);
1164- tab->tab_bytes_to_flush += len;
1165- tab->tab_flush_pending = TRUE;
1166- break;
1167- case XT_LOG_ENT_REC_CLEANED:
1168- len = offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;
1169- goto get_rec_offset;
1170- case XT_LOG_ENT_REC_CLEANED_1:
1171- len = 1;
1172- goto get_rec_offset;
1173- case XT_LOG_ENT_REC_UNLINKED:
1174- if (!in_sequence) {
1175- /* Unlink the record.
1176- * This is done when the record is freed.
1177- */
1178- break;
1179- }
1180- len = offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;
1181- get_rec_offset:
1182- rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);
1183- if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), len, (xtWord1 *) &record->xw.xw_rec_type_1, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
1184- xt_throw(self);
1185- tab->tab_bytes_to_flush += len;
1186- tab->tab_flush_pending = TRUE;
1187- break;
1188- case XT_LOG_ENT_ROW_NEW:
1189- len = offsetof(XTactRowAddedEntryDRec, xa_free_list_4);
1190- row_id = XT_GET_DISK_4(record->xa.xa_row_id_4);
1191- if (!in_sequence) {
1192- /* A row was allocated from the EOF. Because operations are missing.
1193- * The blocks between the current EOF and the new EOF need to be
1194- * place on the free list!
1195- */
1196- while (tab->tab_head_row_eof_id < row_id) {
1197- XT_SET_DISK_4(row_buf.rr_ref_id_4, tab->tab_head_row_free_id);
1198- if (!XT_PWRITE_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, tab->tab_head_row_eof_id), sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
1199- xt_throw(self);
1200- tab->tab_bytes_to_flush += sizeof(XTTabRowRefDRec);
1201- tab->tab_head_row_free_id = tab->tab_head_row_eof_id;
1202- tab->tab_head_row_eof_id++;
1203- }
1204- }
1205- if (tab->tab_head_row_eof_id < row_id + 1)
1206- tab->tab_head_row_eof_id = row_id + 1;
1207- tab->tab_flush_pending = TRUE;
1208- break;
1209- case XT_LOG_ENT_ROW_NEW_FL:
1210- len = sizeof(XTactRowAddedEntryDRec);
1211- row_id = XT_GET_DISK_4(record->xa.xa_row_id_4);
1212- free_ref_id = XT_GET_DISK_4(record->xa.xa_free_list_4);
1213- if (!in_sequence) {
1214- size_t red_size;
1215- /* The record was taken from the free list.
1216- * If the operations were in sequence, then this would be
1217- * the front of the free list now.
1218- * However, because operations are missing, it may no
1219- * longer be the front of the free list!
1220- * Search and remove:
1221- */
1222- link_rec_id = tab->tab_head_row_free_id;
1223- prev_link_rec_id = 0;
1224- while (link_rec_id) {
1225- if (!XT_PREAD_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, link_rec_id), sizeof(XTTabRowRefDRec), 0, (xtWord1 *) &row_buf, &red_size, &self->st_statistics.st_rec, self)) {
1226- xt_log_and_clear_exception(self);
1227- break;
1228- }
1229- if (red_size < sizeof(XTTabRowRefDRec))
1230- break;
1231- if (link_rec_id == row_id)
1232- break;
1233- prev_link_rec_id = link_rec_id;
1234- link_rec_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);
1235- }
1236- if (link_rec_id == row_id) {
1237- /* The block was found on the free list, remove it: */
1238- if (prev_link_rec_id) {
1239- /* We write the record from position 'link_rec_id' into
1240- * position 'prev_link_rec_id'. This unlinks 'link_rec_id'!
1241- */
1242- if (!XT_PWRITE_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, prev_link_rec_id), sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
1243- xt_throw(self);
1244- tab->tab_bytes_to_flush += sizeof(XTTabRowRefDRec);
1245- free_ref_id = tab->tab_head_row_free_id;
1246- }
1247- else
1248- /* The block is at the front of the free list: */
1249- free_ref_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);
1250- }
1251- else {
1252- /* Not found? */
1253- if (tab->tab_head_row_eof_id < row_id + 1)
1254- tab->tab_head_row_eof_id = row_id + 1;
1255- break;
1256- }
1257-
1258- }
1259- if (tab->tab_head_row_eof_id < row_id + 1)
1260- tab->tab_head_row_eof_id = row_id + 1;
1261- tab->tab_head_row_free_id = free_ref_id;
1262- tab->tab_head_row_fnum--;
1263- tab->tab_flush_pending = TRUE;
1264- break;
1265- case XT_LOG_ENT_ROW_FREED:
1266- row_id = XT_GET_DISK_4(record->wr.wr_row_id_4);
1267- if (!in_sequence) {
1268- /* Free the row.
1269- * Since this operation is being performed out of sequence, we
1270- * must assume that some other free and allocation operations
1271- * must be missing.
1272- * For this reason, we add the row to the front of the
1273- * existing free list.
1274- */
1275- XT_SET_DISK_4(record->wr.wr_ref_id_4, tab->tab_head_row_free_id);
1276- }
1277- tab->tab_head_row_free_id = row_id;
1278- tab->tab_head_row_fnum++;
1279- goto write_row_data;
1280- case XT_LOG_ENT_ROW_ADD_REC:
1281- row_id = XT_GET_DISK_4(record->wr.wr_row_id_4);
1282- if (!in_sequence) {
1283- if (!XT_PREAD_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, row_id), sizeof(XTTabRowRefDRec), 0, (xtWord1 *) &row_buf, &tfer, &self->st_statistics.st_rec, self))
1284- xt_throw(self);
1285- if (tfer == sizeof(XTTabRowRefDRec)) {
1286- /* Add a record to the front of the row.
1287- * This is easy, but we have to make sure that the next
1288- * pointer in the record is correct.
1289- */
1290- rec_id = XT_GET_DISK_4(record->wr.wr_ref_id_4);
1291- if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), sizeof(XTTabRecHeadDRec), 0, (xtWord1 *) &rec_head, &tfer, &self->st_statistics.st_rec, self))
1292- xt_throw(self);
1293- if (tfer == sizeof(XTTabRecHeadDRec) && XT_GET_DISK_4(rec_head.tr_row_id_4) == row_id) {
1294- /* This is now the correct next pointer: */
1295- xtRecordID next_ref_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);
1296- if (XT_GET_DISK_4(rec_head.tr_prev_rec_id_4) != next_ref_id &&
1297- rec_id != next_ref_id) {
1298- XT_SET_DISK_4(rec_head.tr_prev_rec_id_4, next_ref_id);
1299- if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), sizeof(XTTabRecHeadDRec), (xtWord1 *) &rec_head, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
1300- xt_throw(self);
1301- tab->tab_bytes_to_flush += sizeof(XTTabRecHeadDRec);
1302- }
1303- }
1304- }
1305-
1306- }
1307- goto write_row_data;
1308- case XT_LOG_ENT_ROW_SET:
1309- if (!in_sequence)
1310- /* This operation is ignored when out of sequence!
1311- * The operation is used to remove a record from a row.
1312- * This is done automatically when the record is freed.
1313- */
1314- break;
1315- row_id = XT_GET_DISK_4(record->wr.wr_row_id_4);
1316- write_row_data:
1317- ASSERT_NS(XT_GET_DISK_4(record->wr.wr_ref_id_4) < tab->tab_head_rec_eof_id);
1318- if (!XT_PWRITE_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, row_id), sizeof(XTTabRowRefDRec), (xtWord1 *) &record->wr.wr_ref_id_4, &ot->ot_thread->st_statistics.st_rec, self))
1319- xt_throw(self);
1320- tab->tab_bytes_to_flush += sizeof(XTTabRowRefDRec);
1321- if (tab->tab_head_row_eof_id < row_id + 1)
1322- tab->tab_head_row_eof_id = row_id + 1;
1323- tab->tab_flush_pending = TRUE;
1324- break;
1325- case XT_LOG_ENT_NO_OP:
1326- case XT_LOG_ENT_END_OF_LOG:
1327- break;
1328- }
1329-}
1330-
1331-/*
1332- * Apply all operations that have been buffered
1333- * for a particular table.
1334- * Operations are buffered if they are
1335- * read from the log out of sequence.
1336- *
1337- * In this case we buffer, and wait for the
1338- * out of sequence operations to arrive.
1339- *
1340- * When the server is running, this will always be
1341- * the case. A delay occurs while a transaction
1342- * fills its private log buffer.
1343- */
1344-static void xres_apply_operations(XTThreadPtr self, XTWriterStatePtr ws, xtBool in_sequence)
1345-{
1346- XTTableHPtr tab = ws->ws_ot->ot_table;
1347- u_int i = 0;
1348- XTOperationPtr op;
1349- xtBool check_index;
1350-
1351-// XTDatabaseHPtr db, XTOpenTablePtr ot, XTXactSeqReadPtr sr, XTDataBufferPtr databuf
1352- xt_sl_lock(self, tab->tab_op_list);
1353- for (;;) {
1354- op = (XTOperationPtr) xt_sl_item_at(tab->tab_op_list, i);
1355- if (!op)
1356- break;
1357- if (in_sequence && tab->tab_head_op_seq+1 != op->or_op_seq)
1358- break;
1359- xt_db_set_size(self, &ws->ws_databuf, (size_t) op->or_op_len);
1360- if (!ws->ws_db->db_xlog.xlog_rnd_read(&ws->ws_seqread, op->or_log_id, op->or_log_offset, (size_t) op->or_op_len, ws->ws_databuf.db_data, NULL, self))
1361- xt_throw(self);
1362- check_index = ws->ws_in_recover && xt_comp_log_pos(op->or_log_id, op->or_log_offset, ws->ws_ind_rec_log_id, ws->ws_ind_rec_log_offset) >= 0;
1363- xres_apply_change(self, ws->ws_ot, (XTXactLogBufferDPtr) ws->ws_databuf.db_data, in_sequence, check_index, &ws->ws_rec_buf);
1364- tab->tab_head_op_seq = op->or_op_seq;
1365- if (tab->tab_wr_wake_freeer) {
1366- if (!XTTableSeq::xt_op_is_before(tab->tab_head_op_seq, tab->tab_wake_freeer_op))
1367- xt_wr_wake_freeer(self);
1368- }
1369- i++;
1370- }
1371- xt_sl_remove_from_front(self, tab->tab_op_list, i);
1372- xt_sl_unlock(self, tab->tab_op_list);
1373-}
1374-
1375-/* Check for operations still remaining on tables.
1376- * These operations are applied even though operations
1377- * in sequence are missing.
1378- */
1379-xtBool xres_sync_operations(XTThreadPtr self, XTDatabaseHPtr db, XTWriterStatePtr ws)
1380-{
1381- u_int edx;
1382- XTTableEntryPtr te_ptr;
1383- XTTableHPtr tab;
1384- xtBool op_synced = FALSE;
1385-
1386- xt_enum_tables_init(&edx);
1387- while ((te_ptr = xt_enum_tables_next(self, db, &edx))) {
1388- /* Dirty read of tab_op_list OK, here because this is the
1389- * only thread that updates the list!
1390- */
1391- if ((tab = te_ptr->te_table)) {
1392- if (xt_sl_get_size(tab->tab_op_list)) {
1393- op_synced = TRUE;
1394- if (xres_open_table(self, ws, te_ptr->te_tab_id))
1395- xres_apply_operations(self, ws, FALSE);
1396- }
1397-
1398- /* Update the pointer cache: */
1399- tab->tab_seq.xt_op_seq_set(self, tab->tab_head_op_seq+1);
1400- tab->tab_row_eof_id = tab->tab_head_row_eof_id;
1401- tab->tab_row_free_id = tab->tab_head_row_free_id;
1402- tab->tab_row_fnum = tab->tab_head_row_fnum;
1403- tab->tab_rec_eof_id = tab->tab_head_rec_eof_id;
1404- tab->tab_rec_free_id = tab->tab_head_rec_free_id;
1405- tab->tab_rec_fnum = tab->tab_head_rec_fnum;
1406- }
1407- }
1408- return op_synced;
1409-}
1410-
1411-/*
1412- * Operations from the log are applied in sequence order.
1413- * If the operations are out of sequence, they are buffered
1414- * until the missing operations appear.
1415- *
1416- * NOTE: No lock is required because there should only be
1417- * one thread that does this!
1418- */
1419-xtPublic void xt_xres_apply_in_order(XTThreadPtr self, XTWriterStatePtr ws, xtLogID log_id, xtLogOffset log_offset, XTXactLogBufferDPtr record)
1420-{
1421- xtOpSeqNo op_seq;
1422- xtTableID tab_id;
1423- size_t len;
1424- xtBool check_index;
1425-
1426-// XTDatabaseHPtr db, XTOpenTablePtr *ot, XTXactSeqReadPtr sr, XTDataBufferPtr databuf
1427- switch (record->xl.xl_status_1) {
1428- case XT_LOG_ENT_REC_MODIFIED:
1429- case XT_LOG_ENT_UPDATE:
1430- case XT_LOG_ENT_INSERT:
1431- case XT_LOG_ENT_DELETE:
1432- case XT_LOG_ENT_UPDATE_BG:
1433- case XT_LOG_ENT_INSERT_BG:
1434- case XT_LOG_ENT_DELETE_BG:
1435- len = offsetof(XTactUpdateEntryDRec, xu_rec_type_1) + (size_t) XT_GET_DISK_2(record->xu.xu_size_2);
1436- op_seq = XT_GET_DISK_4(record->xu.xu_op_seq_4);
1437- tab_id = XT_GET_DISK_4(record->xu.xu_tab_id_4);
1438- break;
1439- case XT_LOG_ENT_UPDATE_FL:
1440- case XT_LOG_ENT_INSERT_FL:
1441- case XT_LOG_ENT_DELETE_FL:
1442- case XT_LOG_ENT_UPDATE_FL_BG:
1443- case XT_LOG_ENT_INSERT_FL_BG:
1444- case XT_LOG_ENT_DELETE_FL_BG:
1445- len = offsetof(XTactUpdateFLEntryDRec, xf_rec_type_1) + (size_t) XT_GET_DISK_2(record->xf.xf_size_2);
1446- op_seq = XT_GET_DISK_4(record->xf.xf_op_seq_4);
1447- tab_id = XT_GET_DISK_4(record->xf.xf_tab_id_4);
1448- break;
1449- case XT_LOG_ENT_REC_FREED:
1450- case XT_LOG_ENT_REC_REMOVED:
1451- case XT_LOG_ENT_REC_REMOVED_EXT:
1452- /* [(7)] REMOVE is now a extended version of FREE! */
1453- len = offsetof(XTactFreeRecEntryDRec, fr_rec_type_1) + sizeof(XTTabRecFreeDRec);
1454- goto fixed_len_data;
1455- case XT_LOG_ENT_REC_REMOVED_BI:
1456- len = offsetof(XTactRemoveBIEntryDRec, rb_rec_type_1) + (size_t) XT_GET_DISK_2(record->rb.rb_size_2);
1457- op_seq = XT_GET_DISK_4(record->rb.rb_op_seq_4);
1458- tab_id = XT_GET_DISK_4(record->rb.rb_tab_id_4);
1459- break;
1460- case XT_LOG_ENT_REC_MOVED:
1461- len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + 8;
1462- goto fixed_len_data;
1463- case XT_LOG_ENT_REC_CLEANED:
1464- len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;
1465- goto fixed_len_data;
1466- case XT_LOG_ENT_REC_CLEANED_1:
1467- len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + 1;
1468- goto fixed_len_data;
1469- case XT_LOG_ENT_REC_UNLINKED:
1470- len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;
1471- fixed_len_data:
1472- op_seq = XT_GET_DISK_4(record->xw.xw_op_seq_4);
1473- tab_id = XT_GET_DISK_4(record->xw.xw_tab_id_4);
1474- break;
1475- case XT_LOG_ENT_ROW_NEW:
1476- len = sizeof(XTactRowAddedEntryDRec) - 4;
1477- goto new_row;
1478- case XT_LOG_ENT_ROW_NEW_FL:
1479- len = sizeof(XTactRowAddedEntryDRec);
1480- new_row:
1481- op_seq = XT_GET_DISK_4(record->xa.xa_op_seq_4);
1482- tab_id = XT_GET_DISK_4(record->xa.xa_tab_id_4);
1483- break;
1484- case XT_LOG_ENT_ROW_ADD_REC:
1485- case XT_LOG_ENT_ROW_SET:
1486- case XT_LOG_ENT_ROW_FREED:
1487- len = offsetof(XTactWriteRowEntryDRec, wr_ref_id_4) + sizeof(XTTabRowRefDRec);
1488- op_seq = XT_GET_DISK_4(record->wr.wr_op_seq_4);
1489- tab_id = XT_GET_DISK_4(record->wr.wr_tab_id_4);
1490- break;
1491- case XT_LOG_ENT_NO_OP:
1492- case XT_LOG_ENT_END_OF_LOG:
1493- return;
1494- default:
1495- return;
1496- }
1497-
1498- if (!xres_open_table(self, ws, tab_id))
1499- return;
1500-
1501- XTTableHPtr tab = ws->ws_ot->ot_table;
1502-
1503- /* NOTE:
1504- *
1505- * During normal operation this is actually given.
1506- *
1507- * During recovery, it only applies to the record/row files
1508- * The index file is flushed indepently, and changes may
1509- * have been applied to the index (due to a call to flush index,
1510- * which comes as a result of out of memory) that have not been
1511- * applied to the record/row files.
1512- *
1513- * As a result we need to do the index checks that apply to this
1514- * change.
1515- *
1516- * At the moment, I will just do everything, which should not
1517- * hurt!
1518- *
1519- * This error can be repeated by running the test
1520- * runTest(OUT_OF_CACHE_UPDATE_TEST, 32, OUT_OF_CACHE_UPDATE_TEST_UPDATE_COUNT, OUT_OF_CACHE_UPDATE_TEST_SET_SIZE)
1521- * and crashing after a while.
1522- *
1523- * Do this by setting not_this to NULL. This will cause the test to
1524- * hang after a while. After a restart the indexes are corrupt if the
1525- * ws->ws_in_recover condition is not present here.
1526- */
1527- if (ws->ws_in_recover) {
1528- if (!tab->tab_recovery_done) {
1529- /* op_seq <= tab_head_op_seq + 1: */
1530- ASSERT(XTTableSeq::xt_op_is_before(op_seq, tab->tab_head_op_seq+2));
1531- if (XTTableSeq::xt_op_is_before(op_seq-1, tab->tab_head_op_seq))
1532- /* Adjust the operation sequence number: */
1533- tab->tab_head_op_seq = op_seq-1;
1534- tab->tab_recovery_done = TRUE;
1535- }
1536- }
1537-
1538- if (!XTTableSeq::xt_op_is_before(tab->tab_head_op_seq, op_seq))
1539- return;
1540-
1541- if (tab->tab_head_op_seq+1 == op_seq) {
1542- /* I could use tab_ind_rec_log_id, but this may be a problem, if
1543- * recovery does not recover up to the last committed transaction.
1544- */
1545- check_index = ws->ws_in_recover && xt_comp_log_pos(log_id, log_offset, ws->ws_ind_rec_log_id, ws->ws_ind_rec_log_offset) >= 0;
1546- xres_apply_change(self, ws->ws_ot, record, TRUE, check_index, &ws->ws_rec_buf);
1547- tab->tab_head_op_seq = op_seq;
1548- if (tab->tab_wr_wake_freeer) {
1549- if (!XTTableSeq::xt_op_is_before(tab->tab_head_op_seq, tab->tab_wake_freeer_op))
1550- xt_wr_wake_freeer(self);
1551- }
1552-
1553- /* Apply any operations in the list that now follow on...
1554- * NOTE: the tab_op_list only has be locked for modification.
1555- * This is because only one thread ever changes the list
1556- * (on startup and the writer), but the checkpoint thread
1557- * reads it.
1558- */
1559- XTOperationPtr op;
1560- if ((op = (XTOperationPtr) xt_sl_first_item(tab->tab_op_list))) {
1561- if (tab->tab_head_op_seq+1 == op->or_op_seq) {
1562- xres_apply_operations(self, ws, TRUE);
1563- }
1564- }
1565- }
1566- else {
1567- /* Add the operation to the list: */
1568- XTOperationRec op;
1569-
1570- op.or_op_seq = op_seq;
1571- op.or_op_len = len;
1572- op.or_log_id = log_id;
1573- op.or_log_offset = log_offset;
1574- xt_sl_lock(self, tab->tab_op_list);
1575- xt_sl_insert(self, tab->tab_op_list, &op_seq, &op);
1576- ASSERT(tab->tab_op_list->sl_usage_count < 1000000);
1577- xt_sl_unlock(self, tab->tab_op_list);
1578- }
1579-}
1580-
1581-/* ----------------------------------------------------------------------
1582- * CHECKPOINTING FUNCTIONALITY
1583- */
1584-
1585-static xtBool xres_delete_data_log(XTDatabaseHPtr db, xtLogID log_id)
1586-{
1587- XTDataLogFilePtr data_log;
1588- char path[PATH_MAX];
1589-
1590- db->db_datalogs.dlc_name(PATH_MAX, path, log_id);
1591-
1592- if (!db->db_datalogs.dlc_remove_data_log(log_id, TRUE))
1593- return FAILED;
1594-
1595- if (xt_fs_exists(path)) {
1596-#ifdef DEBUG_LOG_DELETE
1597- printf("-- delete log: %s\n", path);
1598-#endif
1599- if (!xt_fs_delete(NULL, path))
1600- return FAILED;
1601- }
1602- /* The log was deleted: */
1603- if (!db->db_datalogs.dlc_get_data_log(&data_log, log_id, TRUE, NULL))
1604- return FAILED;
1605- if (data_log) {
1606- if (!db->db_datalogs.dls_set_log_state(data_log, XT_DL_DELETED))
1607- return FAILED;
1608- }
1609- return OK;
1610-}
1611-
1612-static int xres_comp_flush_tabs(XTThreadPtr XT_UNUSED(self), register const void *XT_UNUSED(thunk), register const void *a, register const void *b)
1613-{
1614- xtTableID tab_id = *((xtTableID *) a);
1615- XTCheckPointTablePtr cp_tab = (XTCheckPointTablePtr) b;
1616-
1617- if (tab_id < cp_tab->cpt_tab_id)
1618- return -1;
1619- if (tab_id > cp_tab->cpt_tab_id)
1620- return 1;
1621- return 0;
1622-}
1623-
1624-static void xres_init_checkpoint_state(XTThreadPtr self, XTCheckPointStatePtr cp)
1625-{
1626- xt_init_mutex_with_autoname(self, &cp->cp_state_lock);
1627-}
1628-
1629-static void xres_free_checkpoint_state(XTThreadPtr self, XTCheckPointStatePtr cp)
1630-{
1631- xt_free_mutex(&cp->cp_state_lock);
1632- if (cp->cp_table_ids) {
1633- xt_free_sortedlist(self, cp->cp_table_ids);
1634- cp->cp_table_ids = NULL;
1635- }
1636-}
1637-
1638-/*
1639- * Remove the deleted logs so that they can be re-used.
1640- * This is only possible after a checkpoint has been
1641- * written that does _not_ include these logs as logs
1642- * to be deleted!
1643- */
1644-static xtBool xres_remove_data_logs(XTDatabaseHPtr db)
1645-{
1646- u_int no_of_logs = xt_sl_get_size(db->db_datalogs.dlc_deleted);
1647- xtLogID *log_id_ptr;
1648-
1649- for (u_int i=0; i<no_of_logs; i++) {
1650- log_id_ptr = (xtLogID *) xt_sl_item_at(db->db_datalogs.dlc_deleted, i);
1651- if (!db->db_datalogs.dlc_remove_data_log(*log_id_ptr, FALSE))
1652- return FAILED;
1653- }
1654- xt_sl_set_size(db->db_datalogs.dlc_deleted, 0);
1655- return OK;
1656-}
1657-
1658-/* ----------------------------------------------------------------------
1659- * INIT & EXIT
1660- */
1661-
1662-xtPublic void xt_xres_init(XTThreadPtr self, XTDatabaseHPtr db)
1663-{
1664- xtLogID max_log_id;
1665-
1666- xt_init_mutex_with_autoname(self, &db->db_cp_lock);
1667- xt_init_cond(self, &db->db_cp_cond);
1668-
1669- xres_init_checkpoint_state(self, &db->db_cp_state);
1670- db->db_restart.xres_init(self, db, &db->db_wr_log_id, &db->db_wr_log_offset, &max_log_id);
1671-
1672- /* It is also the position where transactions will start writing the
1673- * log:
1674- */
1675- if (!db->db_xlog.xlog_set_write_offset(db->db_wr_log_id, db->db_wr_log_offset, max_log_id, self))
1676- xt_throw(self);
1677-}
1678-
1679-xtPublic void xt_xres_exit(XTThreadPtr self, XTDatabaseHPtr db)
1680-{
1681- db->db_restart.xres_exit(self);
1682- xres_free_checkpoint_state(self, &db->db_cp_state);
1683- xt_free_mutex(&db->db_cp_lock);
1684- xt_free_cond(&db->db_cp_cond);
1685-}
1686-
1687-/* ----------------------------------------------------------------------
1688- * RESTART FUNCTIONALITY
1689- */
1690-
1691-/*
1692- * Restart the database. This function loads the restart position, and
1693- * applies all changes in the logs, until the end of the log, or
1694- * a corrupted record is found.
1695- *
1696- * The restart position is the position in the log where we know that
1697- * all the changes up to that point have been flushed to the
1698- * database.
1699- *
1700- * This is called the checkpoint position. The checkpoint position
1701- * is written alternatively to 2 restart files.
1702- *
1703- * To make a checkpoint:
1704- * Get the current log writer log offset.
1705- * For each table:
1706- * Get the log offset of the next operation on the table, if an
1707- * operation is queued for the table.
1708- * Flush that table, and the operation sequence to the table.
1709- * For each unclean transaction:
1710- * Get the log offset of the begin of the transaction.
1711- * Write the lowest of all log offsets to the restart file!
1712- */
1713-
1714-void XTXactRestart::xres_init(XTThreadPtr self, XTDatabaseHPtr db, xtLogID *log_id, xtLogOffset *log_offset, xtLogID *max_log_id)
1715-{
1716- char path[PATH_MAX];
1717- XTOpenFilePtr of = NULL;
1718- XTXlogCheckpointDPtr res_1_buffer = NULL;
1719- XTXlogCheckpointDPtr res_2_buffer = NULL;
1720- XTXlogCheckpointDPtr use_buffer;
1721- xtLogID ind_rec_log_id = 0;
1722- xtLogOffset ind_rec_log_offset = 0;
1723-
1724- enter_();
1725- xres_db = db;
1726-
1727- ASSERT(!self->st_database);
1728- /* The following call stack:
1729- * XTDatabaseLog::xlog_flush_pending()
1730- * XTDatabaseLog::xlog_flush()
1731- * xt_xlog_flush_log()
1732- * xt_flush_indices()
1733- * idx_out_of_memory_failure()
1734- * xt_idx_delete()
1735- * xres_remove_index_entries()
1736- * xres_apply_change()
1737- * xt_xres_apply_in_order()
1738- * XTXactRestart::xres_restart()
1739- * XTXactRestart::xres_init()
1740- * Leads to st_database being used!
1741- */
1742- self->st_database = db;
1743-
1744-#ifdef SKIP_STARTUP_CHECKPOINT
1745- /* When debugging, we do not checkpoint immediately, just in case
1746- * we detect a problem during recovery.
1747- */
1748- xres_cp_required = FALSE;
1749-#else
1750- xres_cp_required = TRUE;
1751-#endif
1752- xres_cp_number = 0;
1753- try_(a) {
1754-
1755- /* Figure out which restart file to use.
1756- */
1757- xres_name(PATH_MAX, path, 1);
1758- if ((of = xt_open_file(self, path, XT_FS_MISSING_OK))) {
1759- size_t res_1_size;
1760-
1761- res_1_size = (size_t) xt_seek_eof_file(self, of);
1762- res_1_buffer = (XTXlogCheckpointDPtr) xt_malloc(self, res_1_size);
1763- if (!xt_pread_file(of, 0, res_1_size, res_1_size, res_1_buffer, NULL, &self->st_statistics.st_x, self))
1764- xt_throw(self);
1765- xt_close_file(self, of);
1766- of = NULL;
1767- if (!xres_check_checksum(res_1_buffer, res_1_size)) {
1768- xt_free(self, res_1_buffer);
1769- res_1_buffer = NULL;
1770- }
1771- }
1772-
1773- xres_name(PATH_MAX, path, 2);
1774- if ((of = xt_open_file(self, path, XT_FS_MISSING_OK))) {
1775- size_t res_2_size;
1776-
1777- res_2_size = (size_t) xt_seek_eof_file(self, of);
1778- res_2_buffer = (XTXlogCheckpointDPtr) xt_malloc(self, res_2_size);
1779- if (!xt_pread_file(of, 0, res_2_size, res_2_size, res_2_buffer, NULL, &self->st_statistics.st_x, self))
1780- xt_throw(self);
1781- xt_close_file(self, of);
1782- of = NULL;
1783- if (!xres_check_checksum(res_2_buffer, res_2_size)) {
1784- xt_free(self, res_2_buffer);
1785- res_2_buffer = NULL;
1786- }
1787- }
1788-
1789- if (res_1_buffer && res_2_buffer) {
1790- if (xt_comp_log_pos(
1791- XT_GET_DISK_4(res_1_buffer->xcp_log_id_4),
1792- XT_GET_DISK_6(res_1_buffer->xcp_log_offs_6),
1793- XT_GET_DISK_4(res_2_buffer->xcp_log_id_4),
1794- XT_GET_DISK_6(res_2_buffer->xcp_log_offs_6)) > 0) {
1795- /* The first log is the further along than the second: */
1796- xt_free(self, res_2_buffer);
1797- res_2_buffer = NULL;
1798- }
1799- else {
1800- if (XT_GET_DISK_6(res_1_buffer->xcp_chkpnt_no_6) >
1801- XT_GET_DISK_6(res_2_buffer->xcp_chkpnt_no_6)) {
1802- xt_free(self, res_2_buffer);
1803- res_2_buffer = NULL;
1804- }
1805- else {
1806- xt_free(self, res_1_buffer);
1807- res_1_buffer = NULL;
1808- }
1809- }
1810- }
1811-
1812- if (res_1_buffer) {
1813- use_buffer = res_1_buffer;
1814- xres_next_res_no = 2;
1815- }
1816- else {
1817- use_buffer = res_2_buffer;
1818- xres_next_res_no = 1;
1819- }
1820-
1821- /* Read the checkpoint data: */
1822- if (use_buffer) {
1823- u_int no_of_logs;
1824- xtLogID xt_log_id;
1825- xtTableID xt_tab_id;
1826-
1827- xres_cp_number = XT_GET_DISK_6(use_buffer->xcp_chkpnt_no_6);
1828- xres_cp_log_id = XT_GET_DISK_4(use_buffer->xcp_log_id_4);
1829- xres_cp_log_offset = XT_GET_DISK_6(use_buffer->xcp_log_offs_6);
1830- xt_tab_id = XT_GET_DISK_4(use_buffer->xcp_tab_id_4);
1831- if (xt_tab_id > db->db_curr_tab_id)
1832- db->db_curr_tab_id = xt_tab_id;
1833- db->db_xn_curr_id = XT_GET_DISK_4(use_buffer->xcp_xact_id_4);
1834- ind_rec_log_id = XT_GET_DISK_4(use_buffer->xcp_ind_rec_log_id_4);
1835- ind_rec_log_offset = XT_GET_DISK_6(use_buffer->xcp_ind_rec_log_offs_6);
1836- no_of_logs = XT_GET_DISK_2(use_buffer->xcp_log_count_2);
1837-
1838-#ifdef DEBUG_PRINT
1839- printf("CHECKPOINT log=%d offset=%d ", (int) xres_cp_log_id, (int) xres_cp_log_offset);
1840- if (no_of_logs)
1841- printf("DELETED LOGS: ");
1842-#endif
1843-
1844- /* Logs that are deleted are locked until _after_ the next
1845- * checkpoint.
1846- *
1847- * To prevent the following problem from occuring:
1848- * - Recovery is performed, and log X is deleted
1849- * - After delete a log is free for re-use.
1850- * New data is writen to log X.
1851- * - Server crashes.
1852- * - Recovery is performed from previous checkpoint,
1853- * and log X is deleted again.
1854- *
1855- * To lock the logs the are placed on the deleted list.
1856- * After the next checkpoint, all logs on this list
1857- * will be removed.
1858- */
1859- for (u_int i=0; i<no_of_logs; i++) {
1860- xt_log_id = (xtLogID) XT_GET_DISK_2(use_buffer->xcp_del_log[i]);
1861-#ifdef DEBUG_PRINT
1862- if (i != 0)
1863- printf(", ");
1864- printf("%d", (int) xt_log_id);
1865-#endif
1866-#ifdef DEBUG_KEEP_LOGS
1867- xt_dl_set_to_delete(self, db, xt_log_id);
1868-#else
1869- if (!xres_delete_data_log(db, xt_log_id))
1870- xt_throw(self);
1871-#endif
1872- }
1873-
1874-#ifdef DEBUG_PRINT
1875- printf("\n");
1876-#endif
1877- }
1878- else {
1879- /* Try to determine the correct start point. */
1880- xres_cp_number = 0;
1881- xres_cp_log_id = xt_xlog_get_min_log(self, db);
1882- xres_cp_log_offset = 0;
1883- ind_rec_log_id = xres_cp_log_id;
1884- ind_rec_log_offset = xres_cp_log_offset;
1885-
1886-#ifdef DEBUG_PRINT
1887- printf("CHECKPOINT log=1 offset=0\n");
1888-#endif
1889- }
1890-
1891- if (res_1_buffer) {
1892- xt_free(self, res_1_buffer);
1893- res_1_buffer = NULL;
1894- }
1895- if (res_2_buffer) {
1896- xt_free(self, res_2_buffer);
1897- res_2_buffer = NULL;
1898- }
1899-
1900- if (!xres_restart(self, log_id, log_offset, ind_rec_log_id, ind_rec_log_offset, max_log_id))
1901- xt_throw(self);
1902- }
1903- catch_(a) {
1904- self->st_database = NULL;
1905- if (of)
1906- xt_close_file(self, of);
1907- if (res_1_buffer)
1908- xt_free(self, res_1_buffer);
1909- if (res_2_buffer)
1910- xt_free(self, res_2_buffer);
1911- xres_exit(self);
1912- throw_();
1913- }
1914- cont_(a);
1915- self->st_database = NULL;
1916-
1917- exit_();
1918-}
1919-
1920-void XTXactRestart::xres_exit(XTThreadPtr XT_UNUSED(self))
1921-{
1922-}
1923-
1924-void XTXactRestart::xres_name(size_t size, char *path, xtLogID log_id)
1925-{
1926- char name[50];
1927-
1928- sprintf(name, "restart-%lu.xt", (u_long) log_id);
1929- xt_strcpy(size, path, xres_db->db_main_path);
1930- xt_add_system_dir(size, path);
1931- xt_add_dir_char(size, path);
1932- xt_strcat(size, path, name);
1933-}
1934-
1935-xtBool XTXactRestart::xres_check_checksum(XTXlogCheckpointDPtr buffer, size_t size)
1936-{
1937- size_t head_size;
1938-
1939- /* The minimum size: */
1940- if (size < offsetof(XTXlogCheckpointDRec, xcp_head_size_4) + 4)
1941- return FAILED;
1942-
1943- /* Check the sizes: */
1944- head_size = XT_GET_DISK_4(buffer->xcp_head_size_4);
1945- if (size < head_size)
1946- return FAILED;
1947-
1948- if (XT_GET_DISK_2(buffer->xcp_checksum_2) != xt_get_checksum(((xtWord1 *) buffer) + 2, size - 2, 1))
1949- return FAILED;
1950-
1951- if (XT_GET_DISK_2(buffer->xcp_version_2) != XT_CHECKPOINT_VERSION)
1952- return FAILED;
1953-
1954- return OK;
1955-}
1956-
1957-void XTXactRestart::xres_recover_progress(XTThreadPtr self, XTOpenFilePtr *of, int perc)
1958-{
1959-#ifdef XT_USE_GLOBAL_DB
1960- if (!perc) {
1961- char file_path[PATH_MAX];
1962-
1963- xt_strcpy(PATH_MAX, file_path, xres_db->db_main_path);
1964- xt_add_pbxt_file(PATH_MAX, file_path, "recovery-progress");
1965- *of = xt_open_file(self, file_path, XT_FS_CREATE | XT_FS_MAKE_PATH);
1966- xt_set_eof_file(self, *of, 0);
1967- }
1968-
1969- if (perc > 100) {
1970- char file_path[PATH_MAX];
1971-
1972- if (*of) {
1973- xt_close_file(self, *of);
1974- *of = NULL;
1975- }
1976- xt_strcpy(PATH_MAX, file_path, xres_db->db_main_path);
1977- xt_add_pbxt_file(PATH_MAX, file_path, "recovery-progress");
1978- if (xt_fs_exists(file_path))
1979- xt_fs_delete(self, file_path);
1980- }
1981- else {
1982- char number[40];
1983-
1984- sprintf(number, "%d", perc);
1985- if (!xt_pwrite_file(*of, 0, strlen(number), number, &self->st_statistics.st_x, self))
1986- xt_throw(self);
1987- if (!xt_flush_file(*of, &self->st_statistics.st_x, self))
1988- xt_throw(self);
1989- }
1990-#endif
1991-}
1992-
1993-xtBool XTXactRestart::xres_restart(XTThreadPtr self, xtLogID *log_id, xtLogOffset *log_offset, xtLogID ind_rec_log_id, xtLogOffset ind_rec_log_offset, xtLogID *max_log_id)
1994-{
1995- xtBool ok = TRUE;
1996- XTDatabaseHPtr db = xres_db;
1997- XTXactLogBufferDPtr record;
1998- xtXactID xn_id;
1999- XTXactDataPtr xact;
2000- xtTableID tab_id;
2001- XTWriterStateRec ws;
2002- off_t bytes_read = 0;
2003- off_t bytes_to_read;
2004- volatile xtBool print_progress = FALSE;
2005- volatile off_t perc_size = 0, next_goal = 0;
2006- int perc_complete = 1;
2007- XTOpenFilePtr progress_file = NULL;
2008- xtBool min_ram_xn_id_set = FALSE;
2009- u_int log_count;
2010-
2011- memset(&ws, 0, sizeof(ws));
2012-
2013- ws.ws_db = db;
2014- ws.ws_in_recover = TRUE;
2015- ws.ws_ind_rec_log_id = ind_rec_log_id;
2016- ws.ws_ind_rec_log_offset = ind_rec_log_offset;
2017-
2018- /* Initialize the data log buffer (required if extended data is
2019- * referenced).
2020- * Note: this buffer is freed later. It is part of the thread
2021- * "open database" state, and this means that a thread
2022- * may not have another database open (in use) when
2023- * it calls this functions.
2024- */
2025- self->st_dlog_buf.dlb_init(db, xt_db_log_buffer_size);
2026-
2027- if (!db->db_xlog.xlog_seq_init(&ws.ws_seqread, xt_db_log_buffer_size, TRUE))
2028- return FAILED;
2029-
2030- bytes_to_read = xres_bytes_to_read(self, db, &log_count, max_log_id);
2031- /* Don't print anything about recovering an empty database: */
2032- if (bytes_to_read != 0)
2033- xt_logf(XT_NT_INFO, "PBXT: Recovering from %lu-%llu, bytes to read: %llu\n", (u_long) xres_cp_log_id, (u_llong) xres_cp_log_offset, (u_llong) bytes_to_read);
2034- if (bytes_to_read >= 10*1024*1024) {
2035- print_progress = TRUE;
2036- perc_size = bytes_to_read / 100;
2037- next_goal = perc_size;
2038- xres_recover_progress(self, &progress_file, 0);
2039- }
2040-
2041- if (!db->db_xlog.xlog_seq_start(&ws.ws_seqread, xres_cp_log_id, xres_cp_log_offset, FALSE)) {
2042- ok = FALSE;
2043- goto failed;
2044- }
2045-
2046- try_(a) {
2047- for (;;) {
2048- if (!db->db_xlog.xlog_seq_next(&ws.ws_seqread, &record, TRUE, self)) {
2049- ok = FALSE;
2050- break;
2051- }
2052- /* Increment before. If record is NULL then xseq_record_len will be zero,
2053- * UNLESS the last record was of type XT_LOG_ENT_END_OF_LOG
2054- * which fills the log to align to block of size 512.
2055- */
2056- bytes_read += ws.ws_seqread.xseq_record_len;
2057- if (!record)
2058- break;
2059-#ifdef PRINT_LOG_ON_RECOVERY
2060- xt_print_log_record(ws.ws_seqread.xseq_rec_log_id, ws.ws_seqread.xseq_rec_log_offset, record);
2061-#endif
2062- if (print_progress && bytes_read > next_goal) {
2063- if (((perc_complete - 1) % 25) == 0)
2064- xt_logf(XT_NT_INFO, "PBXT: ");
2065- if ((perc_complete % 25) == 0)
2066- xt_logf(XT_NT_INFO, "%2d\n", (int) perc_complete);
2067- else
2068- xt_logf(XT_NT_INFO, "%2d ", (int) perc_complete);
2069- xt_log_flush(self);
2070- xres_recover_progress(self, &progress_file, perc_complete);
2071- next_goal += perc_size;
2072- perc_complete++;
2073- }
2074- switch (record->xl.xl_status_1) {
2075- case XT_LOG_ENT_HEADER:
2076- break;
2077- case XT_LOG_ENT_NEW_LOG: {
2078- /* Adjust the bytes read for the fact that logs are written
2079- * on 512 byte boundaries.
2080- */
2081- off_t offs, eof = ws.ws_seqread.xseq_log_eof;
2082-
2083- offs = ws.ws_seqread.xseq_rec_log_offset + ws.ws_seqread.xseq_record_len;
2084- if (eof > offs)
2085- bytes_read += eof - offs;
2086- if (!db->db_xlog.xlog_seq_start(&ws.ws_seqread, XT_GET_DISK_4(record->xl.xl_log_id_4), 0, TRUE))
2087- xt_throw(self);
2088- break;
2089- }
2090- case XT_LOG_ENT_NEW_TAB:
2091- tab_id = XT_GET_DISK_4(record->xt.xt_tab_id_4);
2092- if (tab_id > db->db_curr_tab_id)
2093- db->db_curr_tab_id = tab_id;
2094- break;
2095- case XT_LOG_ENT_UPDATE_BG:
2096- case XT_LOG_ENT_INSERT_BG:
2097- case XT_LOG_ENT_DELETE_BG:
2098- xn_id = XT_GET_DISK_4(record->xu.xu_xact_id_4);
2099- goto start_xact;
2100- case XT_LOG_ENT_UPDATE_FL_BG:
2101- case XT_LOG_ENT_INSERT_FL_BG:
2102- case XT_LOG_ENT_DELETE_FL_BG:
2103- xn_id = XT_GET_DISK_4(record->xf.xf_xact_id_4);
2104- start_xact:
2105- if (xt_xn_is_before(db->db_xn_curr_id, xn_id))
2106- db->db_xn_curr_id = xn_id;
2107-
2108- if (!(xact = xt_xn_add_old_xact(db, xn_id, self)))
2109- xt_throw(self);
2110-
2111- xact->xd_begin_log = ws.ws_seqread.xseq_rec_log_id;
2112- xact->xd_begin_offset = ws.ws_seqread.xseq_rec_log_offset;
2113-
2114- xact->xd_end_xn_id = xn_id;
2115- xact->xd_end_time = db->db_xn_end_time;
2116- xact->xd_flags = (XT_XN_XAC_LOGGED | XT_XN_XAC_ENDED | XT_XN_XAC_RECOVERED | XT_XN_XAC_SWEEP);
2117-
2118- /* This may affect the "minimum RAM transaction": */
2119- if (!min_ram_xn_id_set || xt_xn_is_before(xn_id, db->db_xn_min_ram_id)) {
2120- min_ram_xn_id_set = TRUE;
2121- db->db_xn_min_ram_id = xn_id;
2122- }
2123- xt_xres_apply_in_order(self, &ws, ws.ws_seqread.xseq_rec_log_id, ws.ws_seqread.xseq_rec_log_offset, record);
2124- break;
2125- case XT_LOG_ENT_COMMIT:
2126- case XT_LOG_ENT_ABORT:
2127- xn_id = XT_GET_DISK_4(record->xe.xe_xact_id_4);
2128- if ((xact = xt_xn_get_xact(db, xn_id, self))) {
2129- xact->xd_end_xn_id = xn_id;
2130- xact->xd_flags |= XT_XN_XAC_ENDED | XT_XN_XAC_SWEEP;
2131- xact->xd_flags &= ~XT_XN_XAC_RECOVERED; // We can expect an end record on cleanup!
2132- if (record->xl.xl_status_1 == XT_LOG_ENT_COMMIT)
2133- xact->xd_flags |= XT_XN_XAC_COMMITTED;
2134- }
2135- break;
2136- case XT_LOG_ENT_CLEANUP:
2137- /* The transaction was cleaned up: */
2138- xn_id = XT_GET_DISK_4(record->xc.xc_xact_id_4);
2139- xt_xn_delete_xact(db, xn_id, self);
2140- break;
2141- case XT_LOG_ENT_OP_SYNC:
2142- xres_sync_operations(self, db, &ws);
2143- break;
2144- case XT_LOG_ENT_DEL_LOG:
2145- xtLogID rec_log_id;
2146-
2147- rec_log_id = XT_GET_DISK_4(record->xl.xl_log_id_4);
2148- xt_dl_set_to_delete(self, db, rec_log_id);
2149- break;
2150- default:
2151- xt_xres_apply_in_order(self, &ws, ws.ws_seqread.xseq_rec_log_id, ws.ws_seqread.xseq_rec_log_offset, record);
2152- break;
2153- }
2154- }
2155-
2156- if (xres_sync_operations(self, db, &ws)) {
2157- XTactOpSyncEntryDRec op_sync;
2158- time_t now = time(NULL);
2159-
2160- op_sync.os_status_1 = XT_LOG_ENT_OP_SYNC;
2161- op_sync.os_checksum_1 = XT_CHECKSUM_1(now) ^ XT_CHECKSUM_1(ws.ws_seqread.xseq_rec_log_id);
2162- XT_SET_DISK_4(op_sync.os_time_4, (xtWord4) now);
2163- /* TODO: If this is done, check to see that
2164- * the byte written here are read back by the writter.
2165- * This is in order to be in sync with 'xl_log_bytes_written'.
2166- * i.e. xl_log_bytes_written == xl_log_bytes_read
2167- */
2168- if (!db->db_xlog.xlog_write_thru(&ws.ws_seqread, sizeof(XTactOpSyncEntryDRec), (xtWord1 *) &op_sync, self))
2169- xt_throw(self);
2170- }
2171- }
2172- catch_(a) {
2173- ok = FALSE;
2174- }
2175- cont_(a);
2176-
2177- if (ok) {
2178- if (print_progress) {
2179- while (perc_complete <= 100) {
2180- if (((perc_complete - 1) % 25) == 0)
2181- xt_logf(XT_NT_INFO, "PBXT: ");
2182- if ((perc_complete % 25) == 0)
2183- xt_logf(XT_NT_INFO, "%2d\n", (int) perc_complete);
2184- else
2185- xt_logf(XT_NT_INFO, "%2d ", (int) perc_complete);
2186- xt_log_flush(self);
2187- xres_recover_progress(self, &progress_file, perc_complete);
2188- perc_complete++;
2189- }
2190- }
2191- if (bytes_to_read != 0)
2192- xt_logf(XT_NT_INFO, "PBXT: Recovering complete at %lu-%llu, bytes read: %llu\n", (u_long) ws.ws_seqread.xseq_rec_log_id, (u_llong) ws.ws_seqread.xseq_rec_log_offset, (u_llong) bytes_read);
2193-
2194- *log_id = ws.ws_seqread.xseq_rec_log_id;
2195- *log_offset = ws.ws_seqread.xseq_rec_log_offset;
2196-
2197- if (!min_ram_xn_id_set)
2198- /* This is true because if no transaction was placed in RAM then
2199- * the next transaction in RAM will have the next ID: */
2200- db->db_xn_min_ram_id = db->db_xn_curr_id + 1;
2201- }
2202-
2203- failed:
2204- xt_free_writer_state(self, &ws);
2205- self->st_dlog_buf.dlb_exit(self);
2206- xres_recover_progress(self, &progress_file, 101);
2207- return ok;
2208-}
2209-
2210-xtBool XTXactRestart::xres_is_checkpoint_pending(xtLogID curr_log_id, xtLogOffset curr_log_offset)
2211-{
2212- return xt_bytes_since_last_checkpoint(xres_db, curr_log_id, curr_log_offset) >= xt_db_checkpoint_frequency / 2;
2213-}
2214-
2215-/*
2216- * Calculate the bytes to be read for recovery.
2217- * This is only an estimate of the number of bytes that
2218- * will be read.
2219- */
2220-off_t XTXactRestart::xres_bytes_to_read(XTThreadPtr self, XTDatabaseHPtr db, u_int *log_count, xtLogID *max_log_id)
2221-{
2222- off_t to_read = 0, eof;
2223- xtLogID log_id = xres_cp_log_id;
2224- char log_path[PATH_MAX];
2225- XTOpenFilePtr of;
2226- XTXactLogHeaderDRec log_head;
2227- size_t head_size;
2228- size_t red_size;
2229-
2230- *max_log_id = log_id;
2231- *log_count = 0;
2232- for (;;) {
2233- db->db_xlog.xlog_name(PATH_MAX, log_path, log_id);
2234- of = NULL;
2235- if (!xt_open_file_ns(&of, log_path, XT_FS_MISSING_OK))
2236- xt_throw(self);
2237- if (!of)
2238- break;
2239- pushr_(xt_close_file, of);
2240-
2241- /* Check the first record of the log, to see if it is valid. */
2242- if (!xt_pread_file(of, 0, sizeof(XTXactLogHeaderDRec), 0, (xtWord1 *) &log_head, &red_size, &self->st_statistics.st_xlog, self))
2243- xt_throw(self);
2244- /* The minimum size (old log size): */
2245- if (red_size < XT_MIN_LOG_HEAD_SIZE)
2246- goto done;
2247- head_size = XT_GET_DISK_4(log_head.xh_size_4);
2248- if (log_head.xh_status_1 != XT_LOG_ENT_HEADER)
2249- goto done;
2250- if (log_head.xh_checksum_1 != XT_CHECKSUM_1(log_id))
2251- goto done;
2252- if (XT_LOG_HEAD_MAGIC(&log_head, head_size) != XT_LOG_FILE_MAGIC)
2253- goto done;
2254- if (head_size > offsetof(XTXactLogHeaderDRec, xh_log_id_4) + 4) {
2255- if (XT_GET_DISK_4(log_head.xh_log_id_4) != log_id)
2256- goto done;
2257- }
2258- if (head_size > offsetof(XTXactLogHeaderDRec, xh_version_2) + 4) {
2259- if (XT_GET_DISK_2(log_head.xh_version_2) > XT_LOG_VERSION_NO)
2260- xt_throw_ulxterr(XT_CONTEXT, XT_ERR_NEW_TYPE_OF_XLOG, (u_long) log_id);
2261- }
2262-
2263- eof = xt_seek_eof_file(self, of);
2264- freer_(); // xt_close_file(of)
2265- if (log_id == xres_cp_log_id)
2266- to_read += (eof - xres_cp_log_offset);
2267- else
2268- to_read += eof;
2269- (*log_count)++;
2270- *max_log_id = log_id;
2271- log_id++;
2272- }
2273- return to_read;
2274-
2275- done:
2276- freer_(); // xt_close_file(of)
2277- return to_read;
2278-}
2279-
2280-
2281-/* ----------------------------------------------------------------------
2282- * C H E C K P O I N T P R O C E S S
2283- */
2284-
2285-typedef enum XTFileType {
2286- XT_FT_RECROW_FILE,
2287- XT_FT_INDEX_FILE
2288-} XTFileType;
2289-
2290-typedef struct XTDirtyFile {
2291- xtTableID df_tab_id;
2292- XTFileType df_file_type;
2293-} XTDirtyFileRec, *XTDirtyFilePtr;
2294-
2295-#define XT_MAX_FLUSH_FILES 200
2296-#define XT_FLUSH_THRESHOLD (2 * 1024 * 1024)
2297-
2298-/* Sort files to be flused. */
2299-#ifdef USE_LATER
2300-static void xres_cp_flush_files(XTThreadPtr self, XTDatabaseHPtr db)
2301-{
2302- u_int edx;
2303- XTTableEntryPtr te;
2304- XTDirtyFileRec flush_list[XT_MAX_FLUSH_FILES];
2305- u_int file_count = 0;
2306- XTIndexPtr *iptr;
2307- u_int dirty_blocks;
2308- XTOpenTablePtr ot;
2309- XTTableHPtr tab;
2310-
2311- retry:
2312- xt_enum_tables_init(&edx);
2313- xt_ht_lock(self, db->db_tables);
2314- pushr_(xt_ht_unlock, db->db_tables);
2315- while (file_count < XT_MAX_FLUSH_FILES &&
2316- (te = xt_enum_tables_next(self, db, &edx))) {
2317- if ((tab = te->te_table)) {
2318- if (tab->tab_bytes_to_flush >= XT_FLUSH_THRESHOLD) {
2319- flush_list[file_count].df_tab_id = te->te_tab_id;
2320- flush_list[file_count].df_file_type = XT_FT_RECROW_FILE;
2321- file_count++;
2322- }
2323- if (file_count == XT_MAX_FLUSH_FILES)
2324- break;
2325- iptr = tab->tab_dic.dic_keys;
2326- dirty_blocks = 0;
2327- for (u_int i=0;i<tab->tab_dic.dic_key_count; i++) {
2328- dirty_blocks += (*iptr)->mi_dirty_blocks;
2329- iptr++;
2330- }
2331- if ((dirty_blocks * XT_INDEX_PAGE_SIZE) >= XT_FLUSH_THRESHOLD) {
2332- flush_list[file_count].df_tab_id = te->te_tab_id;
2333- flush_list[file_count].df_file_type = XT_FT_INDEX_FILE;
2334- file_count++;
2335- }
2336- }
2337- }
2338- freer_(); // xt_ht_unlock(db->db_tables)
2339-
2340- for (u_int i=0;i<file_count && !self->t_quit; i++) {
2341- /* We want to flush about once a second: */
2342- xt_sleep_milli_second(400);
2343- if ((ot = xt_db_open_pool_table(self, db, flush_list[i].df_tab_id, NULL, TRUE))) {
2344- pushr_(xt_db_return_table_to_pool, ot);
2345-
2346- if (flush_list[i].df_file_type == XT_FT_RECROW_FILE) {
2347- if (!xt_flush_record_row(ot, NULL))
2348- xt_throw(self);
2349- }
2350- else {
2351- if (!xt_flush_indices(ot, NULL))
2352- xt_throw(self);
2353- }
2354-
2355- freer_(); // xt_db_return_table_to_pool(ot)
2356- }
2357- }
2358-
2359- if (file_count == 100)
2360- goto retry;
2361-}
2362-#endif
2363-
2364-#ifdef xxx
2365-void XTXactRestart::xres_checkpoint_pending(xtLogID log_id, xtLogOffset log_offset)
2366-{
2367-#ifdef TRACE_CHECKPOINT_ACTIVITY
2368- xtBool tmp = xres_cp_pending;
2369-#endif
2370- xres_cp_pending = xres_is_checkpoint_pending(log_id, log_offset);
2371-#ifdef TRACE_CHECKPOINT_ACTIVITY
2372- if (tmp) {
2373- if (!xres_cp_pending)
2374- printf("%s xres_cp_pending = FALSE\n", xt_get_self()->t_name);
2375- }
2376- else {
2377- if (xres_cp_pending)
2378- printf("%s xres_cp_pending = TRUE\n", xt_get_self()->t_name);
2379- }
2380-#endif
2381-}
2382-
2383-
2384- xres_checkpoint_pending();
2385-
2386- if (!xres_cp_required &&
2387- !xres_cp_pending &&
2388- xt_sl_get_size(db->db_datalogs.dlc_to_delete) == 0 &&
2389- xt_sl_get_size(db->db_datalogs.dlc_deleted) == 0)
2390- return FALSE;
2391-#endif
2392-
2393-#ifdef NEVER_CHECKPOINT
2394-xtBool no_checkpoint = TRUE;
2395-#endif
2396-
2397-#define XT_CHECKPOINT_IF_NO_ACTIVITY 0
2398-#define XT_CHECKPOINT_PAUSE_IF_ACTIVITY 1
2399-#define XT_CHECKPOINT_NO_PAUSE 2
2400-
2401-/*
2402- * This function performs table flush, as long as the system is idle.
2403- */
2404-static xtBool xres_cp_checkpoint(XTThreadPtr self, XTDatabaseHPtr db, u_int curr_writer_total, xtBool force_checkpoint)
2405-{
2406- XTCheckPointStatePtr cp = &db->db_cp_state;
2407- XTOpenTablePtr ot;
2408- XTCheckPointTablePtr to_flush_ptr;
2409- XTCheckPointTableRec to_flush;
2410- u_int table_count = 0;
2411- xtBool checkpoint_done;
2412- off_t bytes_flushed = 0;
2413- int check_type;
2414-
2415-#ifdef NEVER_CHECKPOINT
2416- if (no_checkpoint)
2417- return FALSE;
2418-#endif
2419- if (force_checkpoint) {
2420- if (db->db_restart.xres_cp_required)
2421- check_type = XT_CHECKPOINT_NO_PAUSE;
2422- else
2423- check_type = XT_CHECKPOINT_PAUSE_IF_ACTIVITY;
2424- }
2425- else
2426- check_type = XT_CHECKPOINT_IF_NO_ACTIVITY;
2427-
2428- to_flush.cpt_tab_id = 0;
2429- to_flush.cpt_flushed = 0;
2430-
2431- /* Start a checkpoint: */
2432- if (!xt_begin_checkpoint(db, FALSE, self))
2433- xt_throw(self);
2434-
2435- while (!self->t_quit) {
2436- xt_lock_mutex_ns(&cp->cp_state_lock);
2437- table_count = 0;
2438- if (cp->cp_table_ids)
2439- table_count = xt_sl_get_size(cp->cp_table_ids);
2440- if (!cp->cp_running || cp->cp_flush_count >= table_count) {
2441- xt_unlock_mutex_ns(&cp->cp_state_lock);
2442- break;
2443- }
2444- if (cp->cp_next_to_flush > table_count)
2445- cp->cp_next_to_flush = 0;
2446-
2447- to_flush_ptr = (XTCheckPointTablePtr) xt_sl_item_at(cp->cp_table_ids, cp->cp_next_to_flush);
2448- if (to_flush_ptr)
2449- to_flush = *to_flush_ptr;
2450- xt_unlock_mutex_ns(&cp->cp_state_lock);
2451-
2452- if (to_flush_ptr) {
2453- if ((ot = xt_db_open_pool_table(self, db, to_flush.cpt_tab_id, NULL, TRUE))) {
2454- pushr_(xt_db_return_table_to_pool, ot);
2455-
2456- if (!(to_flush.cpt_flushed & XT_CPT_REC_ROW_FLUSHED)) {
2457- if (!xt_flush_record_row(ot, &bytes_flushed, FALSE))
2458- xt_throw(self);
2459- }
2460-
2461- xt_lock_mutex_ns(&cp->cp_state_lock);
2462- to_flush_ptr = NULL;
2463- if (cp->cp_running)
2464- to_flush_ptr = (XTCheckPointTablePtr) xt_sl_item_at(cp->cp_table_ids, cp->cp_next_to_flush);
2465- if (to_flush_ptr)
2466- to_flush = *to_flush_ptr;
2467- xt_unlock_mutex_ns(&cp->cp_state_lock);
2468-
2469- if (to_flush_ptr && !self->t_quit) {
2470- if (!(to_flush.cpt_flushed & XT_CPT_INDEX_FLUSHED)) {
2471- switch (check_type) {
2472- case XT_CHECKPOINT_IF_NO_ACTIVITY:
2473- if (bytes_flushed > 0 && curr_writer_total != db->db_xn_total_writer_count) {
2474- freer_(); // xt_db_return_table_to_pool(ot)
2475- goto end_checkpoint;
2476- }
2477- break;
2478- case XT_CHECKPOINT_PAUSE_IF_ACTIVITY:
2479- if (bytes_flushed > 2 * 1024 * 1024 && curr_writer_total != db->db_xn_total_writer_count) {
2480- curr_writer_total = db->db_xn_total_writer_count;
2481- bytes_flushed = 0;
2482- xt_sleep_milli_second(400);
2483- }
2484- break;
2485- case XT_CHECKPOINT_NO_PAUSE:
2486- break;
2487- }
2488-
2489- if (!self->t_quit) {
2490- if (!xt_flush_indices(ot, &bytes_flushed, FALSE))
2491- xt_throw(self);
2492- to_flush.cpt_flushed |= XT_CPT_INDEX_FLUSHED;
2493- }
2494- }
2495- }
2496-
2497- freer_(); // xt_db_return_table_to_pool(ot)
2498- }
2499-
2500- if ((to_flush.cpt_flushed & XT_CPT_ALL_FLUSHED) == XT_CPT_ALL_FLUSHED)
2501- cp->cp_next_to_flush++;
2502- }
2503- else
2504- cp->cp_next_to_flush++;
2505-
2506- if (self->t_quit)
2507- break;
2508-
2509- switch (check_type) {
2510- case XT_CHECKPOINT_IF_NO_ACTIVITY:
2511- if (bytes_flushed > 0 && curr_writer_total != db->db_xn_total_writer_count)
2512- goto end_checkpoint;
2513- break;
2514- case XT_CHECKPOINT_PAUSE_IF_ACTIVITY:
2515- if (bytes_flushed > 2 * 1024 * 1024 && curr_writer_total != db->db_xn_total_writer_count) {
2516- curr_writer_total = db->db_xn_total_writer_count;
2517- bytes_flushed = 0;
2518- xt_sleep_milli_second(400);
2519- }
2520- break;
2521- case XT_CHECKPOINT_NO_PAUSE:
2522- break;
2523- }
2524- }
2525-
2526- end_checkpoint:
2527- if (!xt_end_checkpoint(db, self, &checkpoint_done))
2528- xt_throw(self);
2529- return checkpoint_done;
2530-}
2531-
2532-
2533-/* Wait for the log writer to tell us to do something.
2534- */
2535-static void xres_cp_wait_for_log_writer(XTThreadPtr self, XTDatabaseHPtr db, u_long milli_secs)
2536-{
2537- xt_lock_mutex(self, &db->db_cp_lock);
2538- pushr_(xt_unlock_mutex, &db->db_cp_lock);
2539- if (!self->t_quit)
2540- xt_timed_wait_cond(self, &db->db_cp_cond, &db->db_cp_lock, milli_secs);
2541- freer_(); // xt_unlock_mutex(&db->db_cp_lock)
2542-}
2543-
2544-/*
2545- * This is the way checkpoint works:
2546- *
2547- * To write a checkpoint we need to flush all tables in
2548- * the database.
2549- *
2550- * Before flushing the first table we get the checkpoint
2551- * log position.
2552- *
2553- * After flushing all files we write of the checkpoint
2554- * log position.
2555- */
2556-static void xres_cp_main(XTThreadPtr self)
2557-{
2558- XTDatabaseHPtr db = self->st_database;
2559- u_int curr_writer_total;
2560- time_t now;
2561-
2562- xt_set_low_priority(self);
2563-
2564-
2565- while (!self->t_quit) {
2566- /* Wait 2 seconds: */
2567- curr_writer_total = db->db_xn_total_writer_count;
2568- xt_db_approximate_time = time(NULL);
2569- now = xt_db_approximate_time;
2570- while (!self->t_quit && xt_db_approximate_time < now + 2 && !db->db_restart.xres_cp_required) {
2571- xres_cp_wait_for_log_writer(self, db, 400);
2572- xt_db_approximate_time = time(NULL);
2573- xt_db_free_unused_open_tables(self, db);
2574- }
2575-
2576- if (self->t_quit)
2577- break;
2578-
2579- if (curr_writer_total == db->db_xn_total_writer_count)
2580- /* No activity in 2 seconds: */
2581- xres_cp_checkpoint(self, db, curr_writer_total, FALSE);
2582- else {
2583- /* There server is busy, check if we need to
2584- * write a checkpoint anyway...
2585- */
2586- if (db->db_restart.xres_cp_required ||
2587- db->db_restart.xres_is_checkpoint_pending(db->db_xlog.xl_write_log_id, db->db_xlog.xl_write_log_offset)) {
2588- /* Flush tables, until the checkpoint is complete. */
2589- xres_cp_checkpoint(self, db, curr_writer_total, TRUE);
2590- }
2591- }
2592-
2593- if (curr_writer_total == db->db_xn_total_writer_count) {
2594- /* We did a checkpoint, and still, nothing has
2595- * happened....
2596- *
2597- * Wait for something to happen:
2598- */
2599- xtLogID log_id;
2600- xtLogOffset log_offset;
2601-
2602- while (!self->t_quit && curr_writer_total == db->db_xn_total_writer_count) {
2603- /* The writer position: */
2604- xt_lock_mutex(self, &db->db_wr_lock);
2605- pushr_(xt_unlock_mutex, &db->db_wr_lock);
2606- log_id = db->db_wr_log_id;
2607- log_offset = db->db_wr_log_offset;
2608- freer_(); // xt_unlock_mutex(&db->db_wr_lock)
2609-
2610- /* This condition means we could checkpoint: */
2611- if (!(xt_sl_get_size(db->db_datalogs.dlc_to_delete) == 0 &&
2612- xt_sl_get_size(db->db_datalogs.dlc_deleted) == 0 &&
2613- xt_comp_log_pos(log_id, log_offset, db->db_restart.xres_cp_log_id, db->db_restart.xres_cp_log_offset) <= 0))
2614- break;
2615-
2616- xres_cp_wait_for_log_writer(self, db, 400);
2617- xt_db_approximate_time = time(NULL);
2618- xt_db_free_unused_open_tables(self, db);
2619- }
2620- }
2621- }
2622-}
2623-
2624-static void *xres_cp_run_thread(XTThreadPtr self)
2625-{
2626- XTDatabaseHPtr db = (XTDatabaseHPtr) self->t_data;
2627- int count;
2628- void *mysql_thread;
2629-
2630- mysql_thread = myxt_create_thread();
2631-
2632- while (!self->t_quit) {
2633- try_(a) {
2634- /*
2635- * The garbage collector requires that the database
2636- * is in use because.
2637- */
2638- xt_use_database(self, db, XT_FOR_CHECKPOINTER);
2639-
2640- /* This action is both safe and required (see details elsewhere) */
2641- xt_heap_release(self, self->st_database);
2642-
2643- xres_cp_main(self);
2644- }
2645- catch_(a) {
2646- /* This error is "normal"! */
2647- if (self->t_exception.e_xt_err != XT_ERR_NO_DICTIONARY &&
2648- !(self->t_exception.e_xt_err == XT_SIGNAL_CAUGHT &&
2649- self->t_exception.e_sys_err == SIGTERM))
2650- xt_log_and_clear_exception(self);
2651- }
2652- cont_(a);
2653-
2654- /* Avoid releasing the database (done above) */
2655- self->st_database = NULL;
2656- xt_unuse_database(self, self);
2657-
2658- /* After an exception, pause before trying again... */
2659- /* Number of seconds */
2660- count = 60;
2661- while (!self->t_quit && count > 0) {
2662- sleep(1);
2663- count--;
2664- }
2665- }
2666-
2667- myxt_destroy_thread(mysql_thread, TRUE);
2668- return NULL;
2669-}
2670-
2671-static void xres_cp_free_thread(XTThreadPtr self, void *data)
2672-{
2673- XTDatabaseHPtr db = (XTDatabaseHPtr) data;
2674-
2675- if (db->db_cp_thread) {
2676- xt_lock_mutex(self, &db->db_cp_lock);
2677- pushr_(xt_unlock_mutex, &db->db_cp_lock);
2678- db->db_cp_thread = NULL;
2679- freer_(); // xt_unlock_mutex(&db->db_cp_lock)
2680- }
2681-}
2682-
2683-/* Start a checkpoint, if none has been started. */
2684-xtPublic xtBool xt_begin_checkpoint(XTDatabaseHPtr db, xtBool have_table_lock, XTThreadPtr thread)
2685-{
2686- XTCheckPointStatePtr cp = &db->db_cp_state;
2687- xtLogID log_id;
2688- xtLogOffset log_offset;
2689- xtLogID ind_rec_log_id;
2690- xtLogOffset ind_rec_log_offset;
2691- u_int edx;
2692- XTTableEntryPtr te_ptr;
2693- XTTableHPtr tab;
2694- XTOperationPtr op;
2695- XTCheckPointTableRec cpt;
2696- XTSortedListPtr tables = NULL;
2697-
2698- /* First check if a checkpoint is already running: */
2699- xt_lock_mutex_ns(&cp->cp_state_lock);
2700- if (cp->cp_running) {
2701- xt_unlock_mutex_ns(&cp->cp_state_lock);
2702- return OK;
2703- }
2704- if (cp->cp_table_ids) {
2705- xt_free_sortedlist(NULL, cp->cp_table_ids);
2706- cp->cp_table_ids = NULL;
2707- }
2708- xt_unlock_mutex_ns(&cp->cp_state_lock);
2709-
2710- /* Flush the log before we continue. This is to ensure that
2711- * before we write a checkpoint, that the changes
2712- * done by the sweeper and the compactor, have been
2713- * applied.
2714- *
2715- * Note, the sweeper does not flush the log, so this is
2716- * necessary!
2717- *
2718- * --- I have removed this flush. It is actually just a
2719- * minor optimisation, which pushes the flush position
2720- * below ahead.
2721- *
2722- * Note that the writer position used for the checkpoint
2723- * _will_ be behind the current log flush position.
2724- *
2725- * This is because the writer cannot apply log changes
2726- * until they are flushed.
2727- */
2728- /* This is an alternative to the above.
2729- if (!xt_xlog_flush_log(self))
2730- xt_throw(self);
2731- */
2732- xt_lock_mutex_ns(&db->db_wr_lock);
2733-
2734- /* The theoretical maximum restart log postion, is the
2735- * position of the writer thread:
2736- */
2737- log_id = db->db_wr_log_id;
2738- log_offset = db->db_wr_log_offset;
2739-
2740- ind_rec_log_id = db->db_xlog.xl_flush_log_id;
2741- ind_rec_log_offset = db->db_xlog.xl_flush_log_offset;
2742-
2743- xt_unlock_mutex_ns(&db->db_wr_lock);
2744-
2745- /* Go through all the transactions, and find
2746- * the lowest log start position of all the transactions.
2747- */
2748- for (u_int i=0; i<XT_XN_NO_OF_SEGMENTS; i++) {
2749- XTXactSegPtr seg;
2750-
2751- seg = &db->db_xn_idx[i];
2752- XT_XACT_READ_LOCK(&seg->xs_tab_lock, self);
2753- for (u_int j=0; j<XT_XN_HASH_TABLE_SIZE; j++) {
2754- XTXactDataPtr xact;
2755-
2756- xact = seg->xs_table[j];
2757- while (xact) {
2758- /* If the transaction is logged, but not cleaned: */
2759- if ((xact->xd_flags & (XT_XN_XAC_LOGGED | XT_XN_XAC_CLEANED)) == XT_XN_XAC_LOGGED) {
2760- if (xt_comp_log_pos(log_id, log_offset, xact->xd_begin_log, xact->xd_begin_offset) > 0) {
2761- log_id = xact->xd_begin_log;
2762- log_offset = xact->xd_begin_offset;
2763- }
2764- }
2765- xact = xact->xd_next_xact;
2766- }
2767- }
2768- XT_XACT_UNLOCK(&seg->xs_tab_lock, self, FALSE);
2769- }
2770-
2771-#ifdef TRACE_CHECKPOINT
2772- printf("BEGIN CHECKPOINT %d-%llu\n", (int) log_id, (u_llong) log_offset);
2773-#endif
2774- /* Go through all tables, and find the lowest log position.
2775- * The log position stored by each table shows the position of
2776- * the next operation that still needs to be applied.
2777- *
2778- * This comes from the list of operations which are
2779- * queued for the table.
2780- *
2781- * This function also builds a list of tables!
2782- */
2783-
2784- if (!(tables = xt_new_sortedlist_ns(sizeof(XTCheckPointTableRec), 20, xres_comp_flush_tabs, NULL, NULL)))
2785- return FAILED;
2786-
2787- xt_enum_tables_init(&edx);
2788- if (!have_table_lock)
2789- xt_ht_lock(NULL, db->db_tables);
2790- while ((te_ptr = xt_enum_tables_next(NULL, db, &edx))) {
2791- if ((tab = te_ptr->te_table)) {
2792- xt_sl_lock_ns(tab->tab_op_list, thread);
2793- if ((op = (XTOperationPtr) xt_sl_first_item(tab->tab_op_list))) {
2794- if (xt_comp_log_pos(log_id, log_offset, op->or_log_id, op->or_log_offset) > 0) {
2795- log_id = op->or_log_id;
2796- log_offset = op->or_log_offset;
2797- }
2798- }
2799- xt_sl_unlock(NULL, tab->tab_op_list);
2800- cpt.cpt_flushed = 0;
2801- cpt.cpt_tab_id = tab->tab_id;
2802-#ifdef TRACE_CHECKPOINT
2803- printf("to flush: %d %s\n", (int) tab->tab_id, tab->tab_name->ps_path);
2804-#endif
2805- if (!xt_sl_insert(NULL, tables, &tab->tab_id, &cpt)) {
2806- if (!have_table_lock)
2807- xt_ht_unlock(NULL, db->db_tables);
2808- xt_free_sortedlist(NULL, tables);
2809- return FAILED;
2810- }
2811- }
2812- }
2813- if (!have_table_lock)
2814- xt_ht_unlock(NULL, db->db_tables);
2815-
2816- xt_lock_mutex_ns(&cp->cp_state_lock);
2817- /* If there is a table list, then someone was faster than me! */
2818- if (!cp->cp_running && log_id && log_offset) {
2819- cp->cp_running = TRUE;
2820- cp->cp_log_id = log_id;
2821- cp->cp_log_offset = log_offset;
2822-
2823- cp->cp_ind_rec_log_id = ind_rec_log_id;
2824- cp->cp_ind_rec_log_offset = ind_rec_log_offset;
2825-
2826- cp->cp_flush_count = 0;
2827- cp->cp_next_to_flush = 0;
2828- cp->cp_table_ids = tables;
2829- }
2830- else
2831- xt_free_sortedlist(NULL, tables);
2832- xt_unlock_mutex_ns(&cp->cp_state_lock);
2833-
2834- /* At this point, log flushing can begin... */
2835- return OK;
2836-}
2837-
2838-/* End a checkpoint, if a checkpoint has been started,
2839- * and all checkpoint tables have been flushed
2840- */
2841-xtPublic xtBool xt_end_checkpoint(XTDatabaseHPtr db, XTThreadPtr thread, xtBool *checkpoint_done)
2842-{
2843- XTCheckPointStatePtr cp = &db->db_cp_state;
2844- XTXlogCheckpointDPtr cp_buf = NULL;
2845- char path[PATH_MAX];
2846- XTOpenFilePtr of;
2847- u_int table_count;
2848- size_t chk_size = 0;
2849- u_int no_of_logs = 0;
2850-
2851-#ifdef NEVER_CHECKPOINT
2852- return OK;
2853-#endif
2854- /* Lock the checkpoint state so that only on thread can do this! */
2855- xt_lock_mutex_ns(&cp->cp_state_lock);
2856- if (!cp->cp_running)
2857- goto checkpoint_done;
2858-
2859- table_count = 0;
2860- if (cp->cp_table_ids)
2861- table_count = xt_sl_get_size(cp->cp_table_ids);
2862- if (cp->cp_flush_count < table_count) {
2863- /* Checkpoint is not done, yet! */
2864- xt_unlock_mutex_ns(&cp->cp_state_lock);
2865- if (checkpoint_done)
2866- *checkpoint_done = FALSE;
2867- return OK;
2868- }
2869-
2870- /* Check if anything has changed since the last checkpoint,
2871- * if not, there is no need to write a new checkpoint!
2872- */
2873- if (xt_sl_get_size(db->db_datalogs.dlc_to_delete) == 0 &&
2874- xt_sl_get_size(db->db_datalogs.dlc_deleted) == 0 &&
2875- xt_comp_log_pos(cp->cp_log_id, cp->cp_log_offset, db->db_restart.xres_cp_log_id, db->db_restart.xres_cp_log_offset) <= 0) {
2876- /* A checkpoint is required if the size of the deleted
2877- * list is not zero. The reason is, I cannot remove the
2878- * logs from the deleted list BEFORE a checkpoint has been
2879- * done which does NOT include these logs.
2880- *
2881- * Even though the logs have already been deleted. They
2882- * remain on the deleted list to ensure that they are NOT
2883- * reused during this time, until the next checkpoint.
2884- *
2885- * This is done because if they are used, then on restart
2886- * they would be deleted!
2887- */
2888-#ifdef TRACE_CHECKPOINT
2889- printf("--- END CHECKPOINT - no write\n");
2890-#endif
2891- goto checkpoint_done;
2892- }
2893-
2894-#ifdef TRACE_CHECKPOINT
2895- printf("--- END CHECKPOINT - write start point\n");
2896-#endif
2897- xt_lock_mutex_ns(&db->db_datalogs.dlc_lock);
2898-
2899- no_of_logs = xt_sl_get_size(db->db_datalogs.dlc_to_delete);
2900- chk_size = offsetof(XTXlogCheckpointDRec, xcp_del_log) + no_of_logs * 2;
2901- xtLogID *log_id_ptr;
2902-
2903- if (!(cp_buf = (XTXlogCheckpointDPtr) xt_malloc_ns(chk_size))) {
2904- xt_unlock_mutex_ns(&db->db_datalogs.dlc_lock);
2905- goto failed_0;
2906- }
2907-
2908- /* Increment the checkpoint number. This value is used if 2 checkpoint have the
2909- * same log number. In this case checkpoints may differ in the log files
2910- * that should be deleted. Here it is important to use the most recent
2911- * log file!
2912- */
2913- db->db_restart.xres_cp_number++;
2914-
2915- /* Create the checkpoint record: */
2916- XT_SET_DISK_4(cp_buf->xcp_head_size_4, chk_size);
2917- XT_SET_DISK_2(cp_buf->xcp_version_2, XT_CHECKPOINT_VERSION);
2918- XT_SET_DISK_6(cp_buf->xcp_chkpnt_no_6, db->db_restart.xres_cp_number);
2919- XT_SET_DISK_4(cp_buf->xcp_log_id_4, cp->cp_log_id);
2920- XT_SET_DISK_6(cp_buf->xcp_log_offs_6, cp->cp_log_offset);
2921- XT_SET_DISK_4(cp_buf->xcp_tab_id_4, db->db_curr_tab_id);
2922- XT_SET_DISK_4(cp_buf->xcp_xact_id_4, db->db_xn_curr_id);
2923- XT_SET_DISK_4(cp_buf->xcp_ind_rec_log_id_4, cp->cp_ind_rec_log_id);
2924- XT_SET_DISK_6(cp_buf->xcp_ind_rec_log_offs_6, cp->cp_ind_rec_log_offset);
2925- XT_SET_DISK_2(cp_buf->xcp_log_count_2, no_of_logs);
2926-
2927- for (u_int i=0; i<no_of_logs; i++) {
2928- log_id_ptr = (xtLogID *) xt_sl_item_at(db->db_datalogs.dlc_to_delete, i);
2929- XT_SET_DISK_2(cp_buf->xcp_del_log[i], (xtWord2) *log_id_ptr);
2930- }
2931-
2932- XT_SET_DISK_2(cp_buf->xcp_checksum_2, xt_get_checksum(((xtWord1 *) cp_buf) + 2, chk_size - 2, 1));
2933-
2934- xt_unlock_mutex_ns(&db->db_datalogs.dlc_lock);
2935-
2936- /* Write the checkpoint: */
2937- db->db_restart.xres_name(PATH_MAX, path, db->db_restart.xres_next_res_no);
2938- if (!(of = xt_open_file_ns(path, XT_FS_CREATE | XT_FS_MAKE_PATH)))
2939- goto failed_1;
2940-
2941- if (!xt_set_eof_file(NULL, of, 0))
2942- goto failed_2;
2943- if (!xt_pwrite_file(of, 0, chk_size, (xtWord1 *) cp_buf, &thread->st_statistics.st_x, thread))
2944- goto failed_2;
2945- if (!xt_flush_file(of, &thread->st_statistics.st_x, thread))
2946- goto failed_2;
2947-
2948- xt_close_file_ns(of);
2949-
2950- /* Next time write the other restart file: */
2951- db->db_restart.xres_next_res_no = (db->db_restart.xres_next_res_no % 2) + 1;
2952- db->db_restart.xres_cp_log_id = cp->cp_log_id;
2953- db->db_restart.xres_cp_log_offset = cp->cp_log_offset;
2954- db->db_restart.xres_cp_required = FALSE;
2955-
2956- /*
2957- * Remove all the data logs that were deleted on the
2958- * last checkpoint:
2959- */
2960- if (!xres_remove_data_logs(db))
2961- goto failed_0;
2962-
2963-#ifndef DEBUG_KEEP_LOGS
2964- /* After checkpoint, we can delete transaction logs that will no longer be required
2965- * for recovery...
2966- */
2967- if (cp->cp_log_id > 1) {
2968- xtLogID current_log_id = cp->cp_log_id;
2969- xtLogID del_log_id;
2970-
2971-#ifdef XT_NUMBER_OF_LOGS_TO_SAVE
2972- if (pbxt_crash_debug) {
2973- /* To save the logs, we just consider them in use: */
2974- if (current_log_id > XT_NUMBER_OF_LOGS_TO_SAVE)
2975- current_log_id -= XT_NUMBER_OF_LOGS_TO_SAVE;
2976- else
2977- current_log_id = 1;
2978- }
2979-#endif
2980-
2981- del_log_id = current_log_id - 1;
2982-
2983- while (del_log_id > 0) {
2984- db->db_xlog.xlog_name(PATH_MAX, path, del_log_id);
2985- if (!xt_fs_exists(path))
2986- break;
2987- del_log_id--;
2988- }
2989-
2990- /* This was the lowest log ID that existed: */
2991- del_log_id++;
2992-
2993- /* Delete all logs that still exist, that come before
2994- * the current log:
2995- *
2996- * Do this from least to greatest to ensure no "holes" appear.
2997- */
2998- while (del_log_id < current_log_id) {
2999- switch (db->db_xlog.xlog_delete_log(del_log_id, thread)) {
3000- case OK:
3001- break;
3002- case FAILED:
3003- goto exit_loop;
3004- case XT_ERR:
3005- goto failed_0;
3006- }
3007- del_log_id++;
3008- }
3009- exit_loop:;
3010- }
3011-
3012- /* And we can delete data logs in the list, and place them
3013- * on the deleted list.
3014- */
3015- xtLogID log_id;
3016- for (u_int i=0; i<no_of_logs; i++) {
3017- log_id = (xtLogID) XT_GET_DISK_2(cp_buf->xcp_del_log[i]);
3018- if (!xres_delete_data_log(db, log_id))
3019- goto failed_0;
3020- }
3021-#endif
3022-
3023- xt_free_ns(cp_buf);
3024- cp_buf = NULL;
3025-
3026- checkpoint_done:
3027- cp->cp_running = FALSE;
3028- if (cp->cp_table_ids) {
3029- xt_free_sortedlist(NULL, cp->cp_table_ids);
3030- cp->cp_table_ids = NULL;
3031- }
3032- cp->cp_flush_count = 0;
3033- cp->cp_next_to_flush = 0;
3034- db->db_restart.xres_cp_required = FALSE;
3035- xt_unlock_mutex_ns(&cp->cp_state_lock);
3036- if (checkpoint_done)
3037- *checkpoint_done = TRUE;
3038- return OK;
3039-
3040- failed_2:
3041- xt_close_file_ns(of);
3042-
3043- failed_1:
3044- xt_free_ns(cp_buf);
3045-
3046- failed_0:
3047- if (cp_buf)
3048- xt_free_ns(cp_buf);
3049- xt_unlock_mutex_ns(&cp->cp_state_lock);
3050- return FAILED;
3051-}
3052-
3053-xtPublic xtWord8 xt_bytes_since_last_checkpoint(XTDatabaseHPtr db, xtLogID curr_log_id, xtLogOffset curr_log_offset)
3054-{
3055- xtLogID log_id;
3056- xtLogOffset log_offset;
3057- size_t byte_count = 0;
3058-
3059- log_id = db->db_restart.xres_cp_log_id;
3060- log_offset = db->db_restart.xres_cp_log_offset;
3061-
3062- /* Assume the logs have the threshold: */
3063- if (log_id < curr_log_id) {
3064- if (log_offset < xt_db_log_file_threshold)
3065- byte_count = (size_t) (xt_db_log_file_threshold - log_offset);
3066- log_offset = 0;
3067- log_id++;
3068- }
3069- while (log_id < curr_log_id) {
3070- byte_count += (size_t) xt_db_log_file_threshold;
3071- log_id++;
3072- }
3073- if (log_offset < curr_log_offset)
3074- byte_count += (size_t) (curr_log_offset - log_offset);
3075-
3076- return byte_count;
3077-}
3078-
3079-xtPublic void xt_start_checkpointer(XTThreadPtr self, XTDatabaseHPtr db)
3080-{
3081- char name[PATH_MAX];
3082-
3083- sprintf(name, "CP-%s", xt_last_directory_of_path(db->db_main_path));
3084- xt_remove_dir_char(name);
3085- db->db_cp_thread = xt_create_daemon(self, name);
3086- xt_set_thread_data(db->db_cp_thread, db, xres_cp_free_thread);
3087- xt_run_thread(self, db->db_cp_thread, xres_cp_run_thread);
3088-}
3089-
3090-xtPublic void xt_wait_for_checkpointer(XTThreadPtr self, XTDatabaseHPtr db)
3091-{
3092- time_t then, now;
3093- xtBool message = FALSE;
3094- xtLogID log_id;
3095- xtLogOffset log_offset;
3096-
3097- if (db->db_cp_thread) {
3098- then = time(NULL);
3099- for (;;) {
3100- xt_lock_mutex(self, &db->db_wr_lock);
3101- pushr_(xt_unlock_mutex, &db->db_wr_lock);
3102- log_id = db->db_wr_log_id;
3103- log_offset = db->db_wr_log_offset;
3104- freer_(); // xt_unlock_mutex(&db->db_wr_lock)
3105-
3106- if (xt_sl_get_size(db->db_datalogs.dlc_to_delete) == 0 &&
3107- xt_sl_get_size(db->db_datalogs.dlc_deleted) == 0 &&
3108- xt_comp_log_pos(log_id, log_offset, db->db_restart.xres_cp_log_id, db->db_restart.xres_cp_log_offset) <= 0)
3109- break;
3110-
3111- /* Do a final checkpoint before shutdown: */
3112- db->db_restart.xres_cp_required = TRUE;
3113-
3114- xt_lock_mutex(self, &db->db_cp_lock);
3115- pushr_(xt_unlock_mutex, &db->db_cp_lock);
3116- if (!xt_broadcast_cond_ns(&db->db_cp_cond)) {
3117- xt_log_and_clear_exception_ns();
3118- break;
3119- }
3120- freer_(); // xt_unlock_mutex(&db->db_cp_lock)
3121-
3122- xt_sleep_milli_second(10);
3123-
3124- now = time(NULL);
3125- if (now >= then + 16) {
3126- xt_logf(XT_NT_INFO, "Aborting wait for '%s' checkpointer\n", db->db_name);
3127- message = FALSE;
3128- break;
3129- }
3130- if (now >= then + 2) {
3131- if (!message) {
3132- message = TRUE;
3133- xt_logf(XT_NT_INFO, "Waiting for '%s' checkpointer...\n", db->db_name);
3134- }
3135- }
3136- }
3137-
3138- if (message)
3139- xt_logf(XT_NT_INFO, "Checkpointer '%s' done.\n", db->db_name);
3140- }
3141-}
3142-
3143-xtPublic void xt_stop_checkpointer(XTThreadPtr self, XTDatabaseHPtr db)
3144-{
3145- XTThreadPtr thr_wr;
3146-
3147- if (db->db_cp_thread) {
3148- xt_lock_mutex(self, &db->db_cp_lock);
3149- pushr_(xt_unlock_mutex, &db->db_cp_lock);
3150-
3151- /* This pointer is safe as long as you have the transaction lock. */
3152- if ((thr_wr = db->db_cp_thread)) {
3153- xtThreadID tid = thr_wr->t_id;
3154-
3155- /* Make sure the thread quits when woken up. */
3156- xt_terminate_thread(self, thr_wr);
3157-
3158- xt_wake_checkpointer(self, db);
3159-
3160- freer_(); // xt_unlock_mutex(&db->db_cp_lock)
3161-
3162- /*
3163- * GOTCHA: This is a wierd thing but the SIGTERM directed
3164- * at a particular thread (in this case the sweeper) was
3165- * being caught by a different thread and killing the server
3166- * sometimes. Disconcerting.
3167- * (this may only be a problem on Mac OS X)
3168- xt_kill_thread(thread);
3169- */
3170- xt_wait_for_thread(tid, FALSE);
3171-
3172- /* PMC - This should not be necessary to set the signal here, but in the
3173- * debugger the handler is not called!!?
3174- thr_wr->t_delayed_signal = SIGTERM;
3175- xt_kill_thread(thread);
3176- */
3177- db->db_cp_thread = NULL;
3178- }
3179- else
3180- freer_(); // xt_unlock_mutex(&db->db_cp_lock)
3181- }
3182-}
3183-
3184-xtPublic void xt_wake_checkpointer(XTThreadPtr self, XTDatabaseHPtr db)
3185-{
3186- if (!xt_broadcast_cond_ns(&db->db_cp_cond))
3187- xt_log_and_clear_exception(self);
3188-}
3189-
3190-xtPublic void xt_free_writer_state(struct XTThread *self, XTWriterStatePtr ws)
3191-{
3192- if (ws->ws_db)
3193- ws->ws_db->db_xlog.xlog_seq_exit(&ws->ws_seqread);
3194- xt_db_set_size(self, &ws->ws_databuf, 0);
3195- xt_ib_free(self, &ws->ws_rec_buf);
3196- if (ws->ws_ot) {
3197- xt_db_return_table_to_pool(self, ws->ws_ot);
3198- ws->ws_ot = NULL;
3199- }
3200-}
3201-
3202-xtPublic void xt_dump_xlogs(XTDatabaseHPtr db, xtLogID start_log)
3203-{
3204- XTXactSeqReadRec seq;
3205- XTXactLogBufferDPtr record;
3206- xtLogID log_id = db->db_restart.xres_cp_log_id;
3207- char log_path[PATH_MAX];
3208- XTThreadPtr thread = xt_get_self();
3209-
3210- /* Find the first log that still exists:*/
3211- for (;;) {
3212- log_id--;
3213- db->db_xlog.xlog_name(PATH_MAX, log_path, log_id);
3214- if (!xt_fs_exists(log_path))
3215- break;
3216- }
3217- log_id++;
3218-
3219- if (!db->db_xlog.xlog_seq_init(&seq, xt_db_log_buffer_size, FALSE))
3220- return;
3221-
3222- if (log_id < start_log)
3223- log_id = start_log;
3224-
3225- for (;;) {
3226- db->db_xlog.xlog_name(PATH_MAX, log_path, log_id);
3227- if (!xt_fs_exists(log_path))
3228- break;
3229-
3230- if (!db->db_xlog.xlog_seq_start(&seq, log_id, 0, FALSE))
3231- goto done;
3232-
3233- PRINTF("---------- DUMP LOG %d\n", (int) log_id);
3234- for (;;) {
3235- if (!db->db_xlog.xlog_seq_next(&seq, &record, TRUE, thread)) {
3236- PRINTF("---------- DUMP LOG %d ERROR\n", (int) log_id);
3237- xt_log_and_clear_exception_ns();
3238- break;
3239- }
3240- if (!record) {
3241- PRINTF("---------- DUMP LOG %d DONE\n", (int) log_id);
3242- break;
3243- }
3244- xt_print_log_record(seq.xseq_rec_log_id, seq.xseq_rec_log_offset, record);
3245- }
3246-
3247- log_id++;
3248- }
3249-
3250- done:
3251- db->db_xlog.xlog_seq_exit(&seq);
3252-}
3253-
3254-/* ----------------------------------------------------------------------
3255- * D A T A B A S E R E C O V E R Y T H R E A D
3256- */
3257-
3258-extern XTDatabaseHPtr pbxt_database;
3259-
3260-static void *xn_xres_run_recovery_thread(XTThreadPtr self)
3261-{
3262+/* Copyright (c) 2007 PrimeBase Technologies GmbH
3263+ *
3264+ * PrimeBase XT
3265+ *
3266+ * This program is free software; you can redistribute it and/or modify
3267+ * it under the terms of the GNU General Public License as published by
3268+ * the Free Software Foundation; either version 2 of the License, or
3269+ * (at your option) any later version.
3270+ *
3271+ * This program is distributed in the hope that it will be useful,
3272+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
3273+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
3274+ * GNU General Public License for more details.
3275+ *
3276+ * You should have received a copy of the GNU General Public License
3277+ * along with this program; if not, write to the Free Software
3278+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
3279+ *
3280+ * 2007-11-12 Paul McCullagh
3281+ *
3282+ * H&G2JCtL
3283+ *
3284+ * Restart and write data to the database.
3285+ */
3286+
3287+#include "xt_config.h"
3288+
3289+#include <signal.h>
3290+#include <time.h>
3291+
3292+#ifndef DRIZZLED
3293+#include "mysql_priv.h"
3294+#endif
3295+
3296+#include "ha_pbxt.h"
3297+
3298+#include "xactlog_xt.h"
3299+#include "database_xt.h"
3300+#include "util_xt.h"
3301+#include "strutil_xt.h"
3302+#include "filesys_xt.h"
3303+#include "restart_xt.h"
3304+#include "myxt_xt.h"
3305+#include "trace_xt.h"
3306+
3307+#ifdef DEBUG
3308+//#define DEBUG_PRINT
3309+//#define DEBUG_KEEP_LOGS
3310+//#define PRINT_LOG_ON_RECOVERY
3311+//#define TRACE_RECORD_DATA
3312+//#define SKIP_STARTUP_CHECKPOINT
3313+//#define NEVER_CHECKPOINT
3314+//#define TRACE_CHECKPOINT
3315+#endif
3316+
3317+#define PRINTF printf
3318+//#define PRINTF xt_ftracef
3319+//#define PRINTF xt_trace
3320+
3321+void xt_print_bytes(xtWord1 *buf, u_int len)
3322+{
3323+ for (u_int i=0; i<len; i++) {
3324+ PRINTF("%02x ", (u_int) *buf);
3325+ buf++;
3326+ }
3327+}
3328+
3329+void xt_print_log_record(xtLogID log, xtLogOffset offset, XTXactLogBufferDPtr record)
3330+{
3331+ const char *type = NULL;
3332+ const char *rec_type = NULL;
3333+ xtOpSeqNo op_no = 0;
3334+ xtTableID tab_id = 0;
3335+ xtRowID row_id = 0;
3336+ xtRecordID rec_id = 0;
3337+ xtBool xn_set = FALSE;
3338+ xtXactID xn_id = 0;
3339+ char buffer[200];
3340+ XTTabRecExtDPtr rec_buf;
3341+ XTTabRecExtDPtr ext_rec;
3342+ XTTabRecFixDPtr fix_rec;
3343+ u_int rec_len;
3344+ xtLogID log_id = 0;
3345+ xtLogOffset log_offset = 0;
3346+
3347+ rec_buf = NULL;
3348+ ext_rec = NULL;
3349+ fix_rec = NULL;
3350+ rec_len = 0;
3351+ switch (record->xl.xl_status_1) {
3352+ case XT_LOG_ENT_REC_MODIFIED:
3353+ case XT_LOG_ENT_UPDATE:
3354+ case XT_LOG_ENT_INSERT:
3355+ case XT_LOG_ENT_DELETE:
3356+ case XT_LOG_ENT_UPDATE_BG:
3357+ case XT_LOG_ENT_INSERT_BG:
3358+ case XT_LOG_ENT_DELETE_BG:
3359+ op_no = XT_GET_DISK_4(record->xu.xu_op_seq_4);
3360+ tab_id = XT_GET_DISK_4(record->xu.xu_tab_id_4);
3361+ rec_id = XT_GET_DISK_4(record->xu.xu_rec_id_4);
3362+ xn_id = XT_GET_DISK_4(record->xu.xu_xact_id_4);
3363+ row_id = XT_GET_DISK_4(record->xu.xu_row_id_4);
3364+ rec_len = XT_GET_DISK_2(record->xu.xu_size_2);
3365+ xn_set = TRUE;
3366+ type="rec";
3367+ rec_buf = (XTTabRecExtDPtr) &record->xu.xu_rec_type_1;
3368+ ext_rec = (XTTabRecExtDPtr) &record->xu.xu_rec_type_1;
3369+ if (XT_REC_IS_EXT_DLOG(ext_rec->tr_rec_type_1)) {
3370+ log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
3371+ log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
3372+ }
3373+ else {
3374+ ext_rec = NULL;
3375+ fix_rec = (XTTabRecFixDPtr) &record->xu.xu_rec_type_1;
3376+ }
3377+ break;
3378+ case XT_LOG_ENT_UPDATE_FL:
3379+ case XT_LOG_ENT_INSERT_FL:
3380+ case XT_LOG_ENT_DELETE_FL:
3381+ case XT_LOG_ENT_UPDATE_FL_BG:
3382+ case XT_LOG_ENT_INSERT_FL_BG:
3383+ case XT_LOG_ENT_DELETE_FL_BG:
3384+ op_no = XT_GET_DISK_4(record->xf.xf_op_seq_4);
3385+ tab_id = XT_GET_DISK_4(record->xf.xf_tab_id_4);
3386+ rec_id = XT_GET_DISK_4(record->xf.xf_rec_id_4);
3387+ xn_id = XT_GET_DISK_4(record->xf.xf_xact_id_4);
3388+ row_id = XT_GET_DISK_4(record->xf.xf_row_id_4);
3389+ rec_len = XT_GET_DISK_2(record->xf.xf_size_2);
3390+ xn_set = TRUE;
3391+ type="rec";
3392+ rec_buf = (XTTabRecExtDPtr) &record->xf.xf_rec_type_1;
3393+ ext_rec = (XTTabRecExtDPtr) &record->xf.xf_rec_type_1;
3394+ if (XT_REC_IS_EXT_DLOG(ext_rec->tr_rec_type_1)) {
3395+ log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
3396+ log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
3397+ }
3398+ else {
3399+ ext_rec = NULL;
3400+ fix_rec = (XTTabRecFixDPtr) &record->xf.xf_rec_type_1;
3401+ }
3402+ break;
3403+ case XT_LOG_ENT_REC_FREED:
3404+ case XT_LOG_ENT_REC_REMOVED:
3405+ case XT_LOG_ENT_REC_REMOVED_EXT:
3406+ op_no = XT_GET_DISK_4(record->fr.fr_op_seq_4);
3407+ tab_id = XT_GET_DISK_4(record->fr.fr_tab_id_4);
3408+ rec_id = XT_GET_DISK_4(record->fr.fr_rec_id_4);
3409+ xn_id = XT_GET_DISK_4(record->fr.fr_xact_id_4);
3410+ xn_set = TRUE;
3411+ type="rec";
3412+ break;
3413+ case XT_LOG_ENT_REC_REMOVED_BI:
3414+ op_no = XT_GET_DISK_4(record->rb.rb_op_seq_4);
3415+ tab_id = XT_GET_DISK_4(record->rb.rb_tab_id_4);
3416+ rec_id = XT_GET_DISK_4(record->rb.rb_rec_id_4);
3417+ xn_id = XT_GET_DISK_4(record->rb.rb_xact_id_4);
3418+ row_id = XT_GET_DISK_4(record->rb.rb_row_id_4);
3419+ rec_len = XT_GET_DISK_2(record->rb.rb_size_2);
3420+ xn_set = TRUE;
3421+ type="rec";
3422+ rec_buf = (XTTabRecExtDPtr) &record->rb.rb_rec_type_1;
3423+ ext_rec = (XTTabRecExtDPtr) &record->rb.rb_rec_type_1;
3424+ if (XT_REC_IS_EXT_DLOG(record->rb.rb_rec_type_1)) {
3425+ log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
3426+ log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
3427+ }
3428+ else {
3429+ ext_rec = NULL;
3430+ fix_rec = (XTTabRecFixDPtr) &record->rb.rb_rec_type_1;
3431+ }
3432+ break;
3433+ case XT_LOG_ENT_REC_MOVED:
3434+ op_no = XT_GET_DISK_4(record->xw.xw_op_seq_4);
3435+ tab_id = XT_GET_DISK_4(record->xw.xw_tab_id_4);
3436+ rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);
3437+ log_id = XT_GET_DISK_2(&record->xw.xw_rec_type_1); // This is actually correct
3438+ log_offset = XT_GET_DISK_6(record->xw.xw_next_rec_id_4); // This is actually correct!
3439+ type="rec";
3440+ break;
3441+ case XT_LOG_ENT_REC_CLEANED:
3442+ case XT_LOG_ENT_REC_CLEANED_1:
3443+ case XT_LOG_ENT_REC_UNLINKED:
3444+ op_no = XT_GET_DISK_4(record->xw.xw_op_seq_4);
3445+ tab_id = XT_GET_DISK_4(record->xw.xw_tab_id_4);
3446+ rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);
3447+ type="rec";
3448+ break;
3449+ case XT_LOG_ENT_ROW_NEW:
3450+ case XT_LOG_ENT_ROW_NEW_FL:
3451+ case XT_LOG_ENT_ROW_ADD_REC:
3452+ case XT_LOG_ENT_ROW_SET:
3453+ case XT_LOG_ENT_ROW_FREED:
3454+ op_no = XT_GET_DISK_4(record->xa.xa_op_seq_4);
3455+ tab_id = XT_GET_DISK_4(record->xa.xa_tab_id_4);
3456+ rec_id = XT_GET_DISK_4(record->xa.xa_row_id_4);
3457+ type="row";
3458+ break;
3459+ case XT_LOG_ENT_NO_OP:
3460+ op_no = XT_GET_DISK_4(record->no.no_op_seq_4);
3461+ tab_id = XT_GET_DISK_4(record->no.no_tab_id_4);
3462+ type="-";
3463+ break;
3464+ case XT_LOG_ENT_END_OF_LOG:
3465+ break;
3466+ }
3467+
3468+ switch (record->xl.xl_status_1) {
3469+ case XT_LOG_ENT_HEADER:
3470+ rec_type = "HEADER";
3471+ break;
3472+ case XT_LOG_ENT_NEW_LOG:
3473+ rec_type = "NEW LOG";
3474+ break;
3475+ case XT_LOG_ENT_DEL_LOG:
3476+ sprintf(buffer, "DEL LOG log=%d ", (int) XT_GET_DISK_4(record->xl.xl_log_id_4));
3477+ rec_type = buffer;
3478+ break;
3479+ case XT_LOG_ENT_NEW_TAB:
3480+ rec_type = "NEW TABLE";
3481+ break;
3482+ case XT_LOG_ENT_COMMIT:
3483+ rec_type = "COMMIT";
3484+ xn_id = XT_GET_DISK_4(record->xe.xe_xact_id_4);
3485+ xn_set = TRUE;
3486+ break;
3487+ case XT_LOG_ENT_ABORT:
3488+ rec_type = "ABORT";
3489+ xn_id = XT_GET_DISK_4(record->xe.xe_xact_id_4);
3490+ xn_set = TRUE;
3491+ break;
3492+ case XT_LOG_ENT_CLEANUP:
3493+ rec_type = "CLEANUP";
3494+ xn_id = XT_GET_DISK_4(record->xc.xc_xact_id_4);
3495+ xn_set = TRUE;
3496+ break;
3497+ case XT_LOG_ENT_REC_MODIFIED:
3498+ rec_type = "MODIFIED";
3499+ break;
3500+ case XT_LOG_ENT_UPDATE:
3501+ rec_type = "UPDATE";
3502+ break;
3503+ case XT_LOG_ENT_UPDATE_FL:
3504+ rec_type = "UPDATE-FL";
3505+ break;
3506+ case XT_LOG_ENT_INSERT:
3507+ rec_type = "INSERT";
3508+ break;
3509+ case XT_LOG_ENT_INSERT_FL:
3510+ rec_type = "INSERT-FL";
3511+ break;
3512+ case XT_LOG_ENT_DELETE:
3513+ rec_type = "DELETE";
3514+ break;
3515+ case XT_LOG_ENT_DELETE_FL:
3516+ rec_type = "DELETE-FL-BG";
3517+ break;
3518+ case XT_LOG_ENT_UPDATE_BG:
3519+ rec_type = "UPDATE-BG";
3520+ break;
3521+ case XT_LOG_ENT_UPDATE_FL_BG:
3522+ rec_type = "UPDATE-FL-BG";
3523+ break;
3524+ case XT_LOG_ENT_INSERT_BG:
3525+ rec_type = "INSERT-BG";
3526+ break;
3527+ case XT_LOG_ENT_INSERT_FL_BG:
3528+ rec_type = "INSERT-FL-BG";
3529+ break;
3530+ case XT_LOG_ENT_DELETE_BG:
3531+ rec_type = "DELETE-BG";
3532+ break;
3533+ case XT_LOG_ENT_DELETE_FL_BG:
3534+ rec_type = "DELETE-FL-BG";
3535+ break;
3536+ case XT_LOG_ENT_REC_FREED:
3537+ rec_type = "FREE REC";
3538+ break;
3539+ case XT_LOG_ENT_REC_REMOVED:
3540+ rec_type = "REMOVED REC";
3541+ break;
3542+ case XT_LOG_ENT_REC_REMOVED_EXT:
3543+ rec_type = "REMOVED-X REC";
3544+ break;
3545+ case XT_LOG_ENT_REC_REMOVED_BI:
3546+ rec_type = "REMOVED-BI REC";
3547+ break;
3548+ case XT_LOG_ENT_REC_MOVED:
3549+ rec_type = "MOVED REC";
3550+ break;
3551+ case XT_LOG_ENT_REC_CLEANED:
3552+ rec_type = "CLEAN REC";
3553+ break;
3554+ case XT_LOG_ENT_REC_CLEANED_1:
3555+ rec_type = "CLEAN REC-1";
3556+ break;
3557+ case XT_LOG_ENT_REC_UNLINKED:
3558+ rec_type = "UNLINK REC";
3559+ break;
3560+ case XT_LOG_ENT_ROW_NEW:
3561+ rec_type = "NEW ROW";
3562+ break;
3563+ case XT_LOG_ENT_ROW_NEW_FL:
3564+ rec_type = "NEW ROW-FL";
3565+ break;
3566+ case XT_LOG_ENT_ROW_ADD_REC:
3567+ rec_type = "REC ADD ROW";
3568+ break;
3569+ case XT_LOG_ENT_ROW_SET:
3570+ rec_type = "SET ROW";
3571+ break;
3572+ case XT_LOG_ENT_ROW_FREED:
3573+ rec_type = "FREE ROW";
3574+ break;
3575+ case XT_LOG_ENT_OP_SYNC:
3576+ rec_type = "OP SYNC";
3577+ break;
3578+ case XT_LOG_ENT_NO_OP:
3579+ rec_type = "NO OP";
3580+ break;
3581+ case XT_LOG_ENT_END_OF_LOG:
3582+ rec_type = "END OF LOG";
3583+ break;
3584+ }
3585+
3586+ if (log)
3587+ PRINTF("log=%d offset=%d ", (int) log, (int) offset);
3588+ PRINTF("%s ", rec_type);
3589+ if (type)
3590+ PRINTF("op=%lu tab=%lu %s=%lu ", (u_long) op_no, (u_long) tab_id, type, (u_long) rec_id);
3591+ if (row_id)
3592+ PRINTF("row=%lu ", (u_long) row_id);
3593+ if (log_id)
3594+ PRINTF("log=%lu offset=%lu ", (u_long) log_id, (u_long) log_offset);
3595+ if (xn_set)
3596+ PRINTF("xact=%lu ", (u_long) xn_id);
3597+
3598+#ifdef TRACE_RECORD_DATA
3599+ if (rec_buf) {
3600+ switch (rec_buf->tr_rec_type_1 & XT_TAB_STATUS_MASK) {
3601+ case XT_TAB_STATUS_FREED:
3602+ PRINTF("FREE");
3603+ break;
3604+ case XT_TAB_STATUS_DELETE:
3605+ PRINTF("DELE");
3606+ break;
3607+ case XT_TAB_STATUS_FIXED:
3608+ PRINTF("FIX-");
3609+ break;
3610+ case XT_TAB_STATUS_VARIABLE:
3611+ PRINTF("VAR-");
3612+ break;
3613+ case XT_TAB_STATUS_EXT_DLOG:
3614+ PRINTF("EXT-");
3615+ break;
3616+ }
3617+ if (rec_buf->tr_rec_type_1 & XT_TAB_STATUS_CLEANED_BIT)
3618+ PRINTF("C");
3619+ else
3620+ PRINTF(" ");
3621+ }
3622+ if (ext_rec) {
3623+ rec_len -= offsetof(XTTabRecExtDRec, re_data);
3624+ xt_print_bytes((xtWord1 *) ext_rec, offsetof(XTTabRecExtDRec, re_data));
3625+ PRINTF("| ");
3626+ if (rec_len > 20)
3627+ rec_len = 20;
3628+ xt_print_bytes(ext_rec->re_data, rec_len);
3629+ }
3630+ if (fix_rec) {
3631+ rec_len -= offsetof(XTTabRecFixDRec, rf_data);
3632+ xt_print_bytes((xtWord1 *) fix_rec, offsetof(XTTabRecFixDRec, rf_data));
3633+ PRINTF("| ");
3634+ if (rec_len > 20)
3635+ rec_len = 20;
3636+ xt_print_bytes(fix_rec->rf_data, rec_len);
3637+ }
3638+#endif
3639+
3640+ PRINTF("\n");
3641+}
3642+
3643+#ifdef DEBUG_PRINT
3644+void check_rows(void)
3645+{
3646+ static XTOpenFilePtr of = NULL;
3647+
3648+ if (!of)
3649+ of = xt_open_file_ns("./test/test_tab-1.xtr", XT_FS_DEFAULT);
3650+ if (of) {
3651+ size_t size = (size_t) xt_seek_eof_file(NULL, of);
3652+ xtWord8 *buffer = (xtWord8 *) xt_malloc_ns(size);
3653+ xt_pread_file(of, 0, size, size, buffer, NULL);
3654+ for (size_t i=0; i<size/8; i++) {
3655+ if (!buffer[i])
3656+ printf("%d is NULL\n", (int) i);
3657+ }
3658+ }
3659+}
3660+
3661+#endif
3662+
3663+/* ----------------------------------------------------------------------
3664+ * APPLYING CHANGES IN SEQUENCE
3665+ */
3666+
3667+typedef struct XTOperation {
3668+ xtOpSeqNo or_op_seq;
3669+ xtWord4 or_op_len;
3670+ xtLogID or_log_id;
3671+ xtLogOffset or_log_offset;
3672+} XTOperationRec, *XTOperationPtr;
3673+
3674+static int xres_cmp_op_seq(struct XTThread *XT_UNUSED(self), register const void *XT_UNUSED(thunk), register const void *a, register const void *b)
3675+{
3676+ xtOpSeqNo lf_op_seq = *((xtOpSeqNo *) a);
3677+ XTOperationPtr lf_ptr = (XTOperationPtr) b;
3678+
3679+ if (lf_op_seq == lf_ptr->or_op_seq)
3680+ return 0;
3681+ if (XTTableSeq::xt_op_is_before(lf_op_seq, lf_ptr->or_op_seq))
3682+ return -1;
3683+ return 1;
3684+}
3685+
3686+xtPublic void xt_xres_init_tab(XTThreadPtr self, XTTableHPtr tab)
3687+{
3688+ tab->tab_op_list = xt_new_sortedlist(self, sizeof(XTOperationRec), 20, 1000, xres_cmp_op_seq, NULL, NULL, TRUE, FALSE);
3689+}
3690+
3691+xtPublic void xt_xres_exit_tab(XTThreadPtr self, XTTableHPtr tab)
3692+{
3693+ if (tab->tab_op_list) {
3694+ xt_free_sortedlist(self, tab->tab_op_list);
3695+ tab->tab_op_list = NULL;
3696+ }
3697+}
3698+
3699+static xtBool xres_open_table(XTThreadPtr self, XTWriterStatePtr ws, xtTableID tab_id)
3700+{
3701+ XTOpenTablePtr ot;
3702+
3703+ if ((ot = ws->ws_ot)) {
3704+ if (ot->ot_table->tab_id == tab_id)
3705+ return OK;
3706+ xt_db_return_table_to_pool(self, ot);
3707+ ws->ws_ot = NULL;
3708+ }
3709+
3710+ if (ws->ws_tab_gone == tab_id)
3711+ return FAILED;
3712+ if ((ws->ws_ot = xt_db_open_pool_table(self, ws->ws_db, tab_id, NULL, TRUE))) {
3713+ XTTableHPtr tab;
3714+
3715+ tab = ws->ws_ot->ot_table;
3716+ if (!tab->tab_ind_rec_log_id) {
3717+ /* Should not happen... */
3718+ tab->tab_ind_rec_log_id = ws->ws_ind_rec_log_id;
3719+ tab->tab_ind_rec_log_offset = ws->ws_ind_rec_log_offset;
3720+ }
3721+ return OK;
3722+ }
3723+ ws->ws_tab_gone = tab_id;
3724+ return FAILED;
3725+}
3726+
3727+/* {INDEX-RECOV_ROWID}
3728+ * Add missing index entries during recovery.
3729+ * Set the row ID even if the index entry
3730+ * is not committed. It will be removed later by
3731+ * the sweeper.
3732+ */
3733+static xtBool xres_add_index_entries(XTOpenTablePtr ot, xtRowID row_id, xtRecordID rec_id, xtWord1 *rec_data)
3734+{
3735+ XTTableHPtr tab = ot->ot_table;
3736+ u_int idx_cnt;
3737+ XTIndexPtr *ind;
3738+ //XTIdxSearchKeyRec key;
3739+
3740+ if (tab->tab_dic.dic_disable_index)
3741+ return OK;
3742+
3743+ for (idx_cnt=0, ind=tab->tab_dic.dic_keys; idx_cnt<tab->tab_dic.dic_key_count; idx_cnt++, ind++) {
3744+ if (!xt_idx_insert(ot, *ind, row_id, rec_id, rec_data, NULL, TRUE)) {
3745+ /* Check the error, certain errors are recoverable! */
3746+ XTThreadPtr self = xt_get_self();
3747+
3748+ if (self->t_exception.e_xt_err == XT_SYSTEM_ERROR &&
3749+ (XT_FILE_IN_USE(self->t_exception.e_sys_err) ||
3750+ XT_FILE_ACCESS_DENIED(self->t_exception.e_sys_err) ||
3751+ XT_FILE_TOO_MANY_OPEN(self->t_exception.e_sys_err) ||
3752+ self->t_exception.e_sys_err == XT_ENOMEM)) {
3753+ ot->ot_err_index_no = (*ind)->mi_index_no;
3754+ return FAILED;
3755+ }
3756+
3757+ /* TODO: Write something to the index header to indicate that
3758+ * it is corrupted.
3759+ */
3760+ tab->tab_dic.dic_disable_index = XT_INDEX_CORRUPTED;
3761+ xt_log_and_clear_exception_ns();
3762+ return OK;
3763+ }
3764+ }
3765+ return OK;
3766+}
3767+
3768+static void xres_remove_index_entries(XTOpenTablePtr ot, xtRecordID rec_id, xtWord1 *rec_data)
3769+{
3770+ XTTableHPtr tab = ot->ot_table;
3771+ u_int idx_cnt;
3772+ XTIndexPtr *ind;
3773+
3774+ if (tab->tab_dic.dic_disable_index)
3775+ return;
3776+
3777+ for (idx_cnt=0, ind=tab->tab_dic.dic_keys; idx_cnt<tab->tab_dic.dic_key_count; idx_cnt++, ind++) {
3778+ if (!xt_idx_delete(ot, *ind, rec_id, rec_data))
3779+ xt_log_and_clear_exception_ns();
3780+ }
3781+}
3782+
3783+static xtWord1 *xres_load_record(XTThreadPtr self, XTOpenTablePtr ot, xtRecordID rec_id, xtWord1 *data, size_t red_size, XTInfoBufferPtr rec_buf, u_int cols_req)
3784+{
3785+ XTTableHPtr tab = ot->ot_table;
3786+ xtWord1 *rec_data;
3787+
3788+ rec_data = ot->ot_row_rbuffer;
3789+
3790+ ASSERT(red_size <= ot->ot_row_rbuf_size);
3791+ ASSERT(tab->tab_dic.dic_rec_size <= ot->ot_row_rbuf_size);
3792+ if (data) {
3793+ if (rec_data != data)
3794+ memcpy(rec_data, data, red_size);
3795+ }
3796+ else {
3797+ /* It can be that less than 'dic_rec_size' was written for
3798+ * variable length type records.
3799+ * If this is the last record in the file, then we will read
3800+ * less than actual record size.
3801+ */
3802+ if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), tab->tab_dic.dic_rec_size, 0, rec_data, &red_size, &self->st_statistics.st_rec, self))
3803+ goto failed;
3804+
3805+ if (red_size < sizeof(XTTabRecHeadDRec))
3806+ return NULL;
3807+ }
3808+
3809+ if (XT_REC_IS_FIXED(rec_data[0]))
3810+ rec_data = ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE;
3811+ else {
3812+ if (!xt_ib_alloc(NULL, rec_buf, tab->tab_dic.dic_mysql_buf_size))
3813+ goto failed;
3814+ if (XT_REC_IS_VARIABLE(rec_data[0])) {
3815+ if (!myxt_load_row(ot, rec_data + XT_REC_FIX_HEADER_SIZE, rec_buf->ib_db.db_data, cols_req))
3816+ goto failed;
3817+ }
3818+ else if (XT_REC_IS_EXT_DLOG(rec_data[0])) {
3819+ if (red_size < XT_REC_EXT_HEADER_SIZE)
3820+ return NULL;
3821+
3822+ ASSERT(cols_req);
3823+ if (cols_req && cols_req <= tab->tab_dic.dic_fix_col_count) {
3824+ if (!myxt_load_row(ot, rec_data + XT_REC_EXT_HEADER_SIZE, rec_buf->ib_db.db_data, cols_req))
3825+ goto failed;
3826+ }
3827+ else {
3828+ if (!xt_tab_load_ext_data(ot, rec_id, rec_buf->ib_db.db_data, cols_req))
3829+ goto failed;
3830+ }
3831+ }
3832+ else
3833+ /* This is possible, the record has already been cleaned up. */
3834+ return NULL;
3835+ rec_data = rec_buf->ib_db.db_data;
3836+ }
3837+
3838+ return rec_data;
3839+
3840+ failed:
3841+ /* Running out of memory should not be ignored. */
3842+ if (self->t_exception.e_xt_err == XT_SYSTEM_ERROR &&
3843+ self->t_exception.e_sys_err == XT_ENOMEM)
3844+ xt_throw(self);
3845+ xt_log_and_clear_exception_ns();
3846+ return NULL;
3847+}
3848+
3849+/*
3850+ * Apply a change from the log.
3851+ *
3852+ * This function is basically very straight forward, were it not
3853+ * for the option to apply operations out of sequence.
3854+ * (i.e. in_sequence == FALSE)
3855+ *
3856+ * If operations are applied in sequence, then they can be
3857+ * applied blindly. The update operation is just executed as
3858+ * it was logged.
3859+ *
3860+ * If the changes are not in sequence, then some operation are missing,
3861+ * however, the operations that are present are in the correct order.
3862+ *
3863+ * This can only happen at the end of recovery!!!
3864+ * After we have applied all operations in the log we may be
3865+ * left with some operations that have not been applied
3866+ * because operations were logged out of sequence.
3867+ *
3868+ * The application of these operations there has to take into
3869+ * account the current state of the database.
3870+ * They are then applied in a manner that maintains the
3871+ * database consistency.
3872+ *
3873+ * For example, a record that is freed, is free by placing it
3874+ * on the current free list. Part of the data logged for the
3875+ * operation is ignored. Namely: the "next block" pointer
3876+ * that was originally written into the freed record.
3877+ */
3878+static void xres_apply_change(XTThreadPtr self, XTOpenTablePtr ot, XTXactLogBufferDPtr record, xtBool in_sequence, xtBool check_index, XTInfoBufferPtr rec_buf)
3879+{
3880+ XTTableHPtr tab = ot->ot_table;
3881+ size_t len;
3882+ xtRecordID rec_id;
3883+ xtRefID free_ref_id;
3884+ XTTabRecFreeDRec free_rec;
3885+ xtRowID row_id;
3886+ XTTabRowRefDRec row_buf;
3887+ XTTabRecHeadDRec rec_head;
3888+ size_t tfer;
3889+ xtRecordID link_rec_id, prev_link_rec_id;
3890+ xtWord1 *rec_data = NULL;
3891+ XTTabRecFreeDPtr free_data;
3892+
3893+ switch (record->xl.xl_status_1) {
3894+ case XT_LOG_ENT_REC_MODIFIED:
3895+ case XT_LOG_ENT_UPDATE:
3896+ case XT_LOG_ENT_INSERT:
3897+ case XT_LOG_ENT_DELETE:
3898+ case XT_LOG_ENT_UPDATE_BG:
3899+ case XT_LOG_ENT_INSERT_BG:
3900+ case XT_LOG_ENT_DELETE_BG:
3901+ rec_id = XT_GET_DISK_4(record->xu.xu_rec_id_4);
3902+
3903+ /* This should be done before we apply change to table, as otherwise we lose
3904+ * the key value that we need to remove from index
3905+ */
3906+ if (check_index && ot->ot_table->tab_dic.dic_key_count && record->xl.xl_status_1 == XT_LOG_ENT_REC_MODIFIED) {
3907+ if ((rec_data = xres_load_record(self, ot, rec_id, NULL, 0, rec_buf, tab->tab_dic.dic_ind_cols_req)))
3908+ xres_remove_index_entries(ot, rec_id, rec_data);
3909+ }
3910+
3911+ len = (size_t) XT_GET_DISK_2(record->xu.xu_size_2);
3912+ if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), len, (xtWord1 *) &record->xu.xu_rec_type_1, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
3913+ xt_throw(self);
3914+ tab->tab_bytes_to_flush += len;
3915+
3916+ if (check_index && ot->ot_table->tab_dic.dic_key_count) {
3917+ switch (record->xl.xl_status_1) {
3918+ case XT_LOG_ENT_DELETE:
3919+ case XT_LOG_ENT_DELETE_BG:
3920+ break;
3921+ default:
3922+ if ((rec_data = xres_load_record(self, ot, rec_id, &record->xu.xu_rec_type_1, len, rec_buf, tab->tab_dic.dic_ind_cols_req))) {
3923+ row_id = XT_GET_DISK_4(record->xu.xu_row_id_4);
3924+ if (!xres_add_index_entries(ot, row_id, rec_id, rec_data))
3925+ xt_throw(self);
3926+ }
3927+ break;
3928+ }
3929+ }
3930+
3931+ if (!in_sequence) {
3932+ /* A record has been allocated from the EOF, but out of sequence.
3933+ * This could leave a gap where other records were allocated
3934+ * from the EOF, but those operations have been lost!
3935+ * We compensate for this by adding all blocks between
3936+ * to the free list.
3937+ */
3938+ free_rec.rf_rec_type_1 = XT_TAB_STATUS_FREED;
3939+ free_rec.rf_not_used_1 = 0;
3940+ while (tab->tab_head_rec_eof_id < rec_id) {
3941+ XT_SET_DISK_4(free_rec.rf_next_rec_id_4, tab->tab_head_rec_free_id);
3942+ if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, tab->tab_head_rec_eof_id, sizeof(XTTabRecFreeDRec), (xtWord1 *) &free_rec, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
3943+ xt_throw(self);
3944+ tab->tab_bytes_to_flush += sizeof(XTTabRecFreeDRec);
3945+ tab->tab_head_rec_free_id = tab->tab_head_rec_eof_id;
3946+ tab->tab_head_rec_eof_id++;
3947+ }
3948+ }
3949+ if (tab->tab_head_rec_eof_id < rec_id + 1)
3950+ tab->tab_head_rec_eof_id = rec_id + 1;
3951+ tab->tab_flush_pending = TRUE;
3952+ break;
3953+ case XT_LOG_ENT_UPDATE_FL:
3954+ case XT_LOG_ENT_INSERT_FL:
3955+ case XT_LOG_ENT_DELETE_FL:
3956+ case XT_LOG_ENT_UPDATE_FL_BG:
3957+ case XT_LOG_ENT_INSERT_FL_BG:
3958+ case XT_LOG_ENT_DELETE_FL_BG:
3959+ rec_id = XT_GET_DISK_4(record->xf.xf_rec_id_4);
3960+ len = (size_t) XT_GET_DISK_2(record->xf.xf_size_2);
3961+ free_ref_id = XT_GET_DISK_4(record->xf.xf_free_rec_id_4);
3962+
3963+ if (check_index &&
3964+ record->xf.xf_status_1 != XT_LOG_ENT_DELETE_FL &&
3965+ record->xf.xf_status_1 != XT_LOG_ENT_DELETE_FL_BG) {
3966+ if ((rec_data = xres_load_record(self, ot, rec_id, &record->xf.xf_rec_type_1, len, rec_buf, tab->tab_dic.dic_ind_cols_req))) {
3967+ row_id = XT_GET_DISK_4(record->xf.xf_row_id_4);
3968+ if (!xres_add_index_entries(ot, row_id, rec_id, rec_data))
3969+ xt_throw(self);
3970+ }
3971+ }
3972+
3973+ if (!in_sequence) {
3974+ /* This record was allocated from the free list.
3975+ * Because this operation is out of sequence, there
3976+ * could have been other allocations from the
3977+ * free list before this, that have gone missing.
3978+ * For this reason we have to search the current
3979+ * free list and remove the record.
3980+ */
3981+ link_rec_id = tab->tab_head_rec_free_id;
3982+ prev_link_rec_id = 0;
3983+ while (link_rec_id) {
3984+ if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, link_rec_id), sizeof(XTTabRecFreeDRec), sizeof(XTTabRecFreeDRec), (xtWord1 *) &free_rec, NULL, &self->st_statistics.st_rec, self))
3985+ xt_throw(self);
3986+ if (link_rec_id == rec_id)
3987+ break;
3988+ prev_link_rec_id = link_rec_id;
3989+ link_rec_id = XT_GET_DISK_4(free_rec.rf_next_rec_id_4);
3990+ }
3991+ if (link_rec_id == rec_id) {
3992+ /* The block was found on the free list.
3993+ * remove it: */
3994+ if (prev_link_rec_id) {
3995+ /* We write the record from position 'link_rec_id' into
3996+ * position 'prev_link_rec_id'. This unlinks 'link_rec_id'!
3997+ */
3998+ if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, prev_link_rec_id), sizeof(XTTabRecFreeDRec), (xtWord1 *) &free_rec, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
3999+ xt_throw(self);
4000+ tab->tab_bytes_to_flush += sizeof(XTTabRecFreeDRec);
4001+ free_ref_id = tab->tab_head_rec_free_id;
4002+ }
4003+ else
4004+ /* The block is at the front of the list: */
4005+ free_ref_id = XT_GET_DISK_4(free_rec.rf_next_rec_id_4);
4006+ }
4007+ else {
4008+ /* Not found on the free list? */
4009+ if (tab->tab_head_rec_eof_id < rec_id + 1)
4010+ tab->tab_head_rec_eof_id = rec_id + 1;
4011+ goto write_mod_data;
4012+ }
4013+ }
4014+ if (tab->tab_head_rec_eof_id < rec_id + 1)
4015+ tab->tab_head_rec_eof_id = rec_id + 1;
4016+ tab->tab_head_rec_free_id = free_ref_id;
4017+ tab->tab_head_rec_fnum--;
4018+ write_mod_data:
4019+ if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), len, (xtWord1 *) &record->xf.xf_rec_type_1, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
4020+ xt_throw(self);
4021+ tab->tab_bytes_to_flush += len;
4022+ tab->tab_flush_pending = TRUE;
4023+ break;
4024+ case XT_LOG_ENT_REC_REMOVED:
4025+ case XT_LOG_ENT_REC_REMOVED_EXT: {
4026+ xtBool record_loaded;
4027+ XTTabRecExtDPtr ext_rec;
4028+ size_t red_size;
4029+ xtWord4 log_over_size = 0;
4030+ xtLogID data_log_id = 0;
4031+ xtLogOffset data_log_offset = 0;
4032+ u_int cols_required = 0;
4033+
4034+ rec_id = XT_GET_DISK_4(record->fr.fr_rec_id_4);
4035+ free_data = (XTTabRecFreeDPtr) &record->fr.fr_rec_type_1;
4036+
4037+ /* This is a short-cut, it does not require loading the record: */
4038+ if (!check_index && !tab->tab_dic.dic_blob_count && record->fr.fr_status_1 != XT_LOG_ENT_REC_REMOVED_EXT)
4039+ goto do_rec_freed;
4040+
4041+ ext_rec = (XTTabRecExtDPtr) ot->ot_row_rbuffer;
4042+
4043+ if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), tab->tab_dic.dic_rec_size, 0, ext_rec, &red_size, &self->st_statistics.st_rec, self)) {
4044+ xt_log_and_clear_exception_ns();
4045+ goto do_rec_freed;
4046+ }
4047+
4048+ if (red_size < sizeof(XTTabRecHeadDRec))
4049+ goto do_rec_freed;
4050+
4051+ /* Check that the record is the same as the one originally removed.
4052+ * This can be different if recovery is repeated.
4053+ * For example:
4054+ *
4055+ * log=21 offset=6304472 REMOVED-X REC op=360616 tab=7 rec=25874
4056+ * log=21 offset=6309230 UPDATE-FL op=360618 tab=7 rec=25874 row=26667 log=1 offset=26503077 xact=209
4057+ * log=21 offset=6317500 CLEAN REC op=360631 tab=7 rec=25874
4058+ *
4059+ * If this recovery sequence is repeated, then the REMOVED-X will free the
4060+ * extended record belonging to the update that came afterwards!
4061+ *
4062+ * Additional situation to consider:
4063+ *
4064+ * - A record "x" is created, and index entries created.
4065+ * - A checkpoint is made done.
4066+ * - Record "x" is deleted due to UPDATE.
4067+ * - The index entries are removed, but the index is not
4068+ * flushed.
4069+ * - This deletion is written to disk by the writer.
4070+ * So we have the situation that the remove is on disk,
4071+ * but the index changes have not been made.
4072+ *
4073+ * In this case, skipping to "do_rec_freed" is incorrect.
4074+ */
4075+ if (record->fr.fr_stat_id_1 != ext_rec->tr_stat_id_1 ||
4076+ XT_GET_DISK_4(record->fr.fr_xact_id_4) != XT_GET_DISK_4(ext_rec->tr_xact_id_4))
4077+ goto dont_remove_x_record;
4078+
4079+ if (record->xl.xl_status_1 == XT_LOG_ENT_REC_REMOVED_EXT) {
4080+ if (!XT_REC_IS_EXT_DLOG(ext_rec->tr_rec_type_1))
4081+ goto dont_remove_x_record;
4082+ if (red_size < offsetof(XTTabRecExtDRec, re_data))
4083+ goto dont_remove_x_record;
4084+
4085+ /* Save this for later (can be overwritten by xres_load_record(): */
4086+ data_log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
4087+ data_log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
4088+ log_over_size = XT_GET_DISK_4(ext_rec->re_log_dat_siz_4);
4089+ }
4090+ dont_remove_x_record:
4091+
4092+ record_loaded = FALSE;
4093+
4094+ if (check_index) {
4095+ cols_required = tab->tab_dic.dic_ind_cols_req;
4096+ if (tab->tab_dic.dic_blob_cols_req > cols_required)
4097+ cols_required = tab->tab_dic.dic_blob_cols_req;
4098+ if (!(rec_data = xres_load_record(self, ot, rec_id, ot->ot_row_rbuffer, red_size, rec_buf, cols_required)))
4099+ goto do_rec_freed;
4100+ record_loaded = TRUE;
4101+ xres_remove_index_entries(ot, rec_id, rec_data);
4102+ }
4103+
4104+ if (tab->tab_dic.dic_blob_count) {
4105+ if (!record_loaded) {
4106+ if (tab->tab_dic.dic_blob_cols_req > cols_required)
4107+ cols_required = tab->tab_dic.dic_blob_cols_req;
4108+ if (!(rec_data = xres_load_record(self, ot, rec_id, ot->ot_row_rbuffer, red_size, rec_buf, cols_required)))
4109+ /* [(7)] REMOVE is followed by FREE:
4110+ goto get_rec_offset;
4111+ */
4112+ goto do_rec_freed;
4113+ record_loaded = TRUE;
4114+ }
4115+#ifdef XT_STREAMING
4116+ myxt_release_blobs(ot, rec_data, rec_id);
4117+#endif
4118+ }
4119+
4120+ if (record->xl.xl_status_1 == XT_LOG_ENT_REC_REMOVED_EXT) {
4121+ /* Note: dlb_delete_log() may be repeated, but should handle this:
4122+ *
4123+ * Example:
4124+ * log=5 offset=213334 CLEAN REC op=28175 tab=1 rec=317428
4125+ * ...
4126+ * log=6 offset=321063 REMOVED-X REC op=33878 tab=1 rec=317428
4127+ *
4128+ * When this sequence is repeated during recovery, then CLEAN REC
4129+ * will reset the status byte of the record so that it
4130+ * comes back to here!
4131+ *
4132+ * The check for zero is probably not required here.
4133+ */
4134+ if (data_log_id && data_log_offset && log_over_size) {
4135+ if (!ot->ot_thread->st_dlog_buf.dlb_delete_log(data_log_id, data_log_offset, log_over_size, tab->tab_id, rec_id, self)) {
4136+ if (ot->ot_thread->t_exception.e_xt_err != XT_ERR_BAD_EXT_RECORD &&
4137+ ot->ot_thread->t_exception.e_xt_err != XT_ERR_DATA_LOG_NOT_FOUND)
4138+ xt_log_and_clear_exception_ns();
4139+ }
4140+ }
4141+ }
4142+
4143+ goto do_rec_freed;
4144+ }
4145+ case XT_LOG_ENT_REC_REMOVED_BI: {
4146+ /*
4147+ * For deletion we need the complete before image because of the following problem.
4148+ *
4149+ * DROP TABLE IF EXISTS t1;
4150+ * CREATE TABLE t1 (ID int primary key auto_increment, value int, index (value)) engine=pbxt;
4151+ *
4152+ * insert t1(value) values(50);
4153+ *
4154+ * -- CHECKPOINT --
4155+ *
4156+ * update t1 set value = 60;
4157+ *
4158+ * -- PAUSE --
4159+ *
4160+ * update t1 set value = 70;
4161+ *
4162+ * -- CRASH --
4163+ *
4164+ * select value from t1;
4165+ * select * from t1;
4166+ *
4167+ * 081203 12:11:46 [Note] PBXT: Recovering from 1-148, bytes to read: 33554284
4168+ * log=1 offset=148 UPDATE-BG op=5 tab=1 rec=2 row=1 xact=3
4169+ * log=1 offset=188 REC ADD ROW op=6 tab=1 row=1
4170+ * log=1 offset=206 COMMIT xact=3
4171+ * log=1 offset=216 REMOVED REC op=7 tab=1 rec=1 xact=2
4172+ * log=1 offset=241 CLEAN REC op=8 tab=1 rec=2
4173+ * log=1 offset=261 CLEANUP xact=3
4174+ * log=1 offset=267 UPDATE-FL-BG op=9 tab=1 rec=1 row=1 xact=4
4175+ * log=1 offset=311 REC ADD ROW op=10 tab=1 row=1
4176+ * log=1 offset=329 COMMIT xact=4
4177+ * log=1 offset=339 REMOVED REC op=11 tab=1 rec=2 xact=3
4178+ * log=1 offset=364 CLEAN REC op=12 tab=1 rec=1
4179+ * log=1 offset=384 CLEANUP xact=4
4180+ * 081203 12:12:15 [Note] PBXT: Recovering complete at 1-390, bytes read: 33554284
4181+ *
4182+ * mysql> select value from t1;
4183+ * +-------+
4184+ * | value |
4185+ * +-------+
4186+ * | 50 |
4187+ * | 70 |
4188+ * +-------+
4189+ * 2 rows in set (55.99 sec)
4190+ *
4191+ * mysql> select * from t1;
4192+ * +----+-------+
4193+ * | ID | value |
4194+ * +----+-------+
4195+ * | 1 | 70 |
4196+ * +----+-------+
4197+ * 1 row in set (0.00 sec)
4198+ */
4199+ XTTabRecExtDPtr ext_rec;
4200+ xtWord4 log_over_size = 0;
4201+ xtLogID data_log_id = 0;
4202+ xtLogOffset data_log_offset = 0;
4203+ u_int cols_required = 0;
4204+ xtBool record_loaded;
4205+ size_t rec_size;
4206+
4207+ rec_id = XT_GET_DISK_4(record->rb.rb_rec_id_4);
4208+ rec_size = XT_GET_DISK_2(record->rb.rb_size_2);
4209+
4210+ ext_rec = (XTTabRecExtDPtr) &record->rb.rb_rec_type_1;
4211+
4212+ if (XT_REC_IS_EXT_DLOG(record->rb.rb_rec_type_1)) {
4213+ /* Save this for later (can be overwritten by xres_load_record(): */
4214+ data_log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
4215+ data_log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
4216+ log_over_size = XT_GET_DISK_4(ext_rec->re_log_dat_siz_4);
4217+ }
4218+
4219+ record_loaded = FALSE;
4220+
4221+ if (check_index) {
4222+ cols_required = tab->tab_dic.dic_ind_cols_req;
4223+#ifdef XT_STREAMING
4224+ if (tab->tab_dic.dic_blob_cols_req > cols_required)
4225+ cols_required = tab->tab_dic.dic_blob_cols_req;
4226+#endif
4227+ if (!(rec_data = xres_load_record(self, ot, rec_id, &record->rb.rb_rec_type_1, rec_size, rec_buf, cols_required)))
4228+ goto go_on_to_free;
4229+ record_loaded = TRUE;
4230+ xres_remove_index_entries(ot, rec_id, rec_data);
4231+ }
4232+
4233+#ifdef XT_STREAMING
4234+ if (tab->tab_dic.dic_blob_count) {
4235+ if (!record_loaded) {
4236+ cols_required = tab->tab_dic.dic_blob_cols_req;
4237+ if (!(rec_data = xres_load_record(self, ot, rec_id, &record->rb.rb_rec_type_1, rec_size, rec_buf, cols_required)))
4238+ /* [(7)] REMOVE is followed by FREE:
4239+ goto get_rec_offset;
4240+ */
4241+ goto go_on_to_free;
4242+ record_loaded = TRUE;
4243+ }
4244+ myxt_release_blobs(ot, rec_data, rec_id);
4245+ }
4246+#endif
4247+
4248+ if (data_log_id && data_log_offset && log_over_size) {
4249+ if (!ot->ot_thread->st_dlog_buf.dlb_delete_log(data_log_id, data_log_offset, log_over_size, tab->tab_id, rec_id, self)) {
4250+ if (ot->ot_thread->t_exception.e_xt_err != XT_ERR_BAD_EXT_RECORD &&
4251+ ot->ot_thread->t_exception.e_xt_err != XT_ERR_DATA_LOG_NOT_FOUND)
4252+ xt_log_and_clear_exception_ns();
4253+ }
4254+ }
4255+
4256+ go_on_to_free:
4257+ /* Use the new record type: */
4258+ record->rb.rb_rec_type_1 = record->rb.rb_new_rec_type_1;
4259+ free_data = (XTTabRecFreeDPtr) &record->rb.rb_rec_type_1;
4260+ goto do_rec_freed;
4261+ }
4262+ case XT_LOG_ENT_REC_FREED:
4263+ rec_id = XT_GET_DISK_4(record->fr.fr_rec_id_4);
4264+ free_data = (XTTabRecFreeDPtr) &record->fr.fr_rec_type_1;
4265+ do_rec_freed:
4266+ if (!in_sequence) {
4267+ size_t red_size;
4268+
4269+ /* Free the record.
4270+ * We place the record on front of the current
4271+ * free list.
4272+ *
4273+ * However, before we do this, we remove the record
4274+ * from its row list, if the record is on a row list.
4275+ *
4276+ * We do this here, because in the normal removal
4277+ * from the row list uses the operations:
4278+ *
4279+ * XT_LOG_ENT_REC_UNLINKED, XT_LOG_ENT_ROW_SET and
4280+ * XT_LOG_ENT_ROW_FREED.
4281+ *
4282+ * When operations are performed out of sequence,
4283+ * these operations are ignored for the purpose
4284+ * of removing the record from the row.
4285+ */
4286+ if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), sizeof(XTTabRecHeadDRec), sizeof(XTTabRecHeadDRec), (xtWord1 *) &rec_head, NULL, &self->st_statistics.st_rec, self))
4287+ xt_throw(self);
4288+ /* The record is already free: */
4289+ if (XT_REC_IS_FREE(rec_head.tr_rec_type_1))
4290+ goto free_done;
4291+ row_id = XT_GET_DISK_4(rec_head.tr_row_id_4);
4292+
4293+ /* Search the row for this record: */
4294+ if (!XT_PREAD_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, row_id), sizeof(XTTabRowRefDRec), sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf, NULL, &self->st_statistics.st_rec, self))
4295+ xt_throw(self);
4296+ link_rec_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);
4297+ prev_link_rec_id = 0;
4298+ while (link_rec_id) {
4299+ if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, link_rec_id), sizeof(XTTabRecHeadDRec), 0, (xtWord1 *) &rec_head, &red_size, &self->st_statistics.st_rec, self)) {
4300+ xt_log_and_clear_exception(self);
4301+ break;
4302+ }
4303+ if (red_size < sizeof(XTTabRecHeadDRec))
4304+ break;
4305+ if (link_rec_id == rec_id)
4306+ break;
4307+ if (XT_GET_DISK_4(rec_head.tr_row_id_4) != row_id)
4308+ break;
4309+ switch (rec_head.tr_rec_type_1 & XT_TAB_STATUS_MASK) {
4310+ case XT_TAB_STATUS_FREED:
4311+ break;
4312+ case XT_TAB_STATUS_DELETE:
4313+ case XT_TAB_STATUS_FIXED:
4314+ case XT_TAB_STATUS_VARIABLE:
4315+ case XT_TAB_STATUS_EXT_DLOG:
4316+ break;
4317+ default:
4318+ ASSERT(FALSE);
4319+ goto exit_loop;
4320+ }
4321+ if (rec_head.tr_rec_type_1 & ~(XT_TAB_STATUS_CLEANED_BIT | XT_TAB_STATUS_MASK)) {
4322+ ASSERT(FALSE);
4323+ break;
4324+ }
4325+ prev_link_rec_id = link_rec_id;
4326+ link_rec_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
4327+ }
4328+
4329+ exit_loop:
4330+ if (link_rec_id == rec_id) {
4331+ /* The record was found on the row list, remove it: */
4332+ if (prev_link_rec_id) {
4333+ /* We write the previous variation pointer from position 'link_rec_id' into
4334+ * variation pointer of the 'prev_link_rec_id' record. This unlinks 'link_rec_id'!
4335+ */
4336+ if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, prev_link_rec_id) + offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4), XT_RECORD_ID_SIZE, (xtWord1 *) &rec_head.tr_prev_rec_id_4, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
4337+ xt_throw(self);
4338+ tab->tab_bytes_to_flush += XT_RECORD_ID_SIZE;
4339+ }
4340+ else {
4341+ /* The record is at the front of the row list: */
4342+ xtRefID ref_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
4343+ XT_SET_DISK_4(row_buf.rr_ref_id_4, ref_id);
4344+ if (!XT_PWRITE_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, row_id), sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
4345+ xt_throw(self);
4346+ tab->tab_bytes_to_flush += sizeof(XTTabRowRefDRec);
4347+ }
4348+ }
4349+
4350+ /* Now we free the record, by placing it at the front of
4351+ * the free list:
4352+ */
4353+ XT_SET_DISK_4(free_data->rf_next_rec_id_4, tab->tab_head_rec_free_id);
4354+ }
4355+ tab->tab_head_rec_free_id = rec_id;
4356+ tab->tab_head_rec_fnum++;
4357+ if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), sizeof(XTTabRecFreeDRec), (xtWord1 *) free_data, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
4358+ xt_throw(self);
4359+ tab->tab_bytes_to_flush += sizeof(XTTabRecFreeDRec);
4360+ tab->tab_flush_pending = TRUE;
4361+ free_done:
4362+ break;
4363+ case XT_LOG_ENT_REC_MOVED:
4364+ len = 8;
4365+ rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);
4366+ if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id) + offsetof(XTTabRecExtDRec, re_log_id_2), len, (xtWord1 *) &record->xw.xw_rec_type_1, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
4367+ xt_throw(self);
4368+ tab->tab_bytes_to_flush += len;
4369+ tab->tab_flush_pending = TRUE;
4370+ break;
4371+ case XT_LOG_ENT_REC_CLEANED:
4372+ len = offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;
4373+ goto get_rec_offset;
4374+ case XT_LOG_ENT_REC_CLEANED_1:
4375+ len = 1;
4376+ goto get_rec_offset;
4377+ case XT_LOG_ENT_REC_UNLINKED:
4378+ if (!in_sequence) {
4379+ /* Unlink the record.
4380+ * This is done when the record is freed.
4381+ */
4382+ break;
4383+ }
4384+ len = offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;
4385+ get_rec_offset:
4386+ rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);
4387+ if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), len, (xtWord1 *) &record->xw.xw_rec_type_1, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
4388+ xt_throw(self);
4389+ tab->tab_bytes_to_flush += len;
4390+ tab->tab_flush_pending = TRUE;
4391+ break;
4392+ case XT_LOG_ENT_ROW_NEW:
4393+ len = offsetof(XTactRowAddedEntryDRec, xa_free_list_4);
4394+ row_id = XT_GET_DISK_4(record->xa.xa_row_id_4);
4395+ if (!in_sequence) {
4396+ /* A row was allocated from the EOF. Because operations are missing.
4397+ * The blocks between the current EOF and the new EOF need to be
4398+ * place on the free list!
4399+ */
4400+ while (tab->tab_head_row_eof_id < row_id) {
4401+ XT_SET_DISK_4(row_buf.rr_ref_id_4, tab->tab_head_row_free_id);
4402+ if (!XT_PWRITE_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, tab->tab_head_row_eof_id), sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
4403+ xt_throw(self);
4404+ tab->tab_bytes_to_flush += sizeof(XTTabRowRefDRec);
4405+ tab->tab_head_row_free_id = tab->tab_head_row_eof_id;
4406+ tab->tab_head_row_eof_id++;
4407+ }
4408+ }
4409+ if (tab->tab_head_row_eof_id < row_id + 1)
4410+ tab->tab_head_row_eof_id = row_id + 1;
4411+ tab->tab_flush_pending = TRUE;
4412+ break;
4413+ case XT_LOG_ENT_ROW_NEW_FL:
4414+ len = sizeof(XTactRowAddedEntryDRec);
4415+ row_id = XT_GET_DISK_4(record->xa.xa_row_id_4);
4416+ free_ref_id = XT_GET_DISK_4(record->xa.xa_free_list_4);
4417+ if (!in_sequence) {
4418+ size_t red_size;
4419+ /* The record was taken from the free list.
4420+ * If the operations were in sequence, then this would be
4421+ * the front of the free list now.
4422+ * However, because operations are missing, it may no
4423+ * longer be the front of the free list!
4424+ * Search and remove:
4425+ */
4426+ link_rec_id = tab->tab_head_row_free_id;
4427+ prev_link_rec_id = 0;
4428+ while (link_rec_id) {
4429+ if (!XT_PREAD_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, link_rec_id), sizeof(XTTabRowRefDRec), 0, (xtWord1 *) &row_buf, &red_size, &self->st_statistics.st_rec, self)) {
4430+ xt_log_and_clear_exception(self);
4431+ break;
4432+ }
4433+ if (red_size < sizeof(XTTabRowRefDRec))
4434+ break;
4435+ if (link_rec_id == row_id)
4436+ break;
4437+ prev_link_rec_id = link_rec_id;
4438+ link_rec_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);
4439+ }
4440+ if (link_rec_id == row_id) {
4441+ /* The block was found on the free list, remove it: */
4442+ if (prev_link_rec_id) {
4443+ /* We write the record from position 'link_rec_id' into
4444+ * position 'prev_link_rec_id'. This unlinks 'link_rec_id'!
4445+ */
4446+ if (!XT_PWRITE_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, prev_link_rec_id), sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
4447+ xt_throw(self);
4448+ tab->tab_bytes_to_flush += sizeof(XTTabRowRefDRec);
4449+ free_ref_id = tab->tab_head_row_free_id;
4450+ }
4451+ else
4452+ /* The block is at the front of the free list: */
4453+ free_ref_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);
4454+ }
4455+ else {
4456+ /* Not found? */
4457+ if (tab->tab_head_row_eof_id < row_id + 1)
4458+ tab->tab_head_row_eof_id = row_id + 1;
4459+ break;
4460+ }
4461+
4462+ }
4463+ if (tab->tab_head_row_eof_id < row_id + 1)
4464+ tab->tab_head_row_eof_id = row_id + 1;
4465+ tab->tab_head_row_free_id = free_ref_id;
4466+ tab->tab_head_row_fnum--;
4467+ tab->tab_flush_pending = TRUE;
4468+ break;
4469+ case XT_LOG_ENT_ROW_FREED:
4470+ row_id = XT_GET_DISK_4(record->wr.wr_row_id_4);
4471+ if (!in_sequence) {
4472+ /* Free the row.
4473+ * Since this operation is being performed out of sequence, we
4474+ * must assume that some other free and allocation operations
4475+ * must be missing.
4476+ * For this reason, we add the row to the front of the
4477+ * existing free list.
4478+ */
4479+ XT_SET_DISK_4(record->wr.wr_ref_id_4, tab->tab_head_row_free_id);
4480+ }
4481+ tab->tab_head_row_free_id = row_id;
4482+ tab->tab_head_row_fnum++;
4483+ goto write_row_data;
4484+ case XT_LOG_ENT_ROW_ADD_REC:
4485+ row_id = XT_GET_DISK_4(record->wr.wr_row_id_4);
4486+ if (!in_sequence) {
4487+ if (!XT_PREAD_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, row_id), sizeof(XTTabRowRefDRec), 0, (xtWord1 *) &row_buf, &tfer, &self->st_statistics.st_rec, self))
4488+ xt_throw(self);
4489+ if (tfer == sizeof(XTTabRowRefDRec)) {
4490+ /* Add a record to the front of the row.
4491+ * This is easy, but we have to make sure that the next
4492+ * pointer in the record is correct.
4493+ */
4494+ rec_id = XT_GET_DISK_4(record->wr.wr_ref_id_4);
4495+ if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), sizeof(XTTabRecHeadDRec), 0, (xtWord1 *) &rec_head, &tfer, &self->st_statistics.st_rec, self))
4496+ xt_throw(self);
4497+ if (tfer == sizeof(XTTabRecHeadDRec) && XT_GET_DISK_4(rec_head.tr_row_id_4) == row_id) {
4498+ /* This is now the correct next pointer: */
4499+ xtRecordID next_ref_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);
4500+ if (XT_GET_DISK_4(rec_head.tr_prev_rec_id_4) != next_ref_id &&
4501+ rec_id != next_ref_id) {
4502+ XT_SET_DISK_4(rec_head.tr_prev_rec_id_4, next_ref_id);
4503+ if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), sizeof(XTTabRecHeadDRec), (xtWord1 *) &rec_head, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
4504+ xt_throw(self);
4505+ tab->tab_bytes_to_flush += sizeof(XTTabRecHeadDRec);
4506+ }
4507+ }
4508+ }
4509+
4510+ }
4511+ goto write_row_data;
4512+ case XT_LOG_ENT_ROW_SET:
4513+ if (!in_sequence)
4514+ /* This operation is ignored when out of sequence!
4515+ * The operation is used to remove a record from a row.
4516+ * This is done automatically when the record is freed.
4517+ */
4518+ break;
4519+ row_id = XT_GET_DISK_4(record->wr.wr_row_id_4);
4520+ write_row_data:
4521+ ASSERT_NS(XT_GET_DISK_4(record->wr.wr_ref_id_4) < tab->tab_head_rec_eof_id);
4522+ if (!XT_PWRITE_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, row_id), sizeof(XTTabRowRefDRec), (xtWord1 *) &record->wr.wr_ref_id_4, &ot->ot_thread->st_statistics.st_rec, self))
4523+ xt_throw(self);
4524+ tab->tab_bytes_to_flush += sizeof(XTTabRowRefDRec);
4525+ if (tab->tab_head_row_eof_id < row_id + 1)
4526+ tab->tab_head_row_eof_id = row_id + 1;
4527+ tab->tab_flush_pending = TRUE;
4528+ break;
4529+ case XT_LOG_ENT_NO_OP:
4530+ case XT_LOG_ENT_END_OF_LOG:
4531+ break;
4532+ }
4533+}
4534+
4535+/*
4536+ * Apply all operations that have been buffered
4537+ * for a particular table.
4538+ * Operations are buffered if they are
4539+ * read from the log out of sequence.
4540+ *
4541+ * In this case we buffer, and wait for the
4542+ * out of sequence operations to arrive.
4543+ *
4544+ * When the server is running, this will always be
4545+ * the case. A delay occurs while a transaction
4546+ * fills its private log buffer.
4547+ */
4548+static void xres_apply_operations(XTThreadPtr self, XTWriterStatePtr ws, xtBool in_sequence)
4549+{
4550+ XTTableHPtr tab = ws->ws_ot->ot_table;
4551+ u_int i = 0;
4552+ XTOperationPtr op;
4553+ xtBool check_index;
4554+
4555+// XTDatabaseHPtr db, XTOpenTablePtr ot, XTXactSeqReadPtr sr, XTDataBufferPtr databuf
4556+ xt_sl_lock(self, tab->tab_op_list);
4557+ for (;;) {
4558+ op = (XTOperationPtr) xt_sl_item_at(tab->tab_op_list, i);
4559+ if (!op)
4560+ break;
4561+ if (in_sequence && tab->tab_head_op_seq+1 != op->or_op_seq)
4562+ break;
4563+ xt_db_set_size(self, &ws->ws_databuf, (size_t) op->or_op_len);
4564+ if (!ws->ws_db->db_xlog.xlog_rnd_read(&ws->ws_seqread, op->or_log_id, op->or_log_offset, (size_t) op->or_op_len, ws->ws_databuf.db_data, NULL, self))
4565+ xt_throw(self);
4566+ check_index = ws->ws_in_recover && xt_comp_log_pos(op->or_log_id, op->or_log_offset, ws->ws_ind_rec_log_id, ws->ws_ind_rec_log_offset) >= 0;
4567+ xres_apply_change(self, ws->ws_ot, (XTXactLogBufferDPtr) ws->ws_databuf.db_data, in_sequence, check_index, &ws->ws_rec_buf);
4568+ tab->tab_head_op_seq = op->or_op_seq;
4569+ if (tab->tab_wr_wake_freeer) {
4570+ if (!XTTableSeq::xt_op_is_before(tab->tab_head_op_seq, tab->tab_wake_freeer_op))
4571+ xt_wr_wake_freeer(self);
4572+ }
4573+ i++;
4574+ }
4575+ xt_sl_remove_from_front(self, tab->tab_op_list, i);
4576+ xt_sl_unlock(self, tab->tab_op_list);
4577+}
4578+
4579+/* Check for operations still remaining on tables.
4580+ * These operations are applied even though operations
4581+ * in sequence are missing.
4582+ */
4583+xtBool xres_sync_operations(XTThreadPtr self, XTDatabaseHPtr db, XTWriterStatePtr ws)
4584+{
4585+ u_int edx;
4586+ XTTableEntryPtr te_ptr;
4587+ XTTableHPtr tab;
4588+ xtBool op_synced = FALSE;
4589+
4590+ xt_enum_tables_init(&edx);
4591+ while ((te_ptr = xt_enum_tables_next(self, db, &edx))) {
4592+ /* Dirty read of tab_op_list OK, here because this is the
4593+ * only thread that updates the list!
4594+ */
4595+ if ((tab = te_ptr->te_table)) {
4596+ if (xt_sl_get_size(tab->tab_op_list)) {
4597+ op_synced = TRUE;
4598+ if (xres_open_table(self, ws, te_ptr->te_tab_id))
4599+ xres_apply_operations(self, ws, FALSE);
4600+ }
4601+
4602+ /* Update the pointer cache: */
4603+ tab->tab_seq.xt_op_seq_set(self, tab->tab_head_op_seq+1);
4604+ tab->tab_row_eof_id = tab->tab_head_row_eof_id;
4605+ tab->tab_row_free_id = tab->tab_head_row_free_id;
4606+ tab->tab_row_fnum = tab->tab_head_row_fnum;
4607+ tab->tab_rec_eof_id = tab->tab_head_rec_eof_id;
4608+ tab->tab_rec_free_id = tab->tab_head_rec_free_id;
4609+ tab->tab_rec_fnum = tab->tab_head_rec_fnum;
4610+ }
4611+ }
4612+ return op_synced;
4613+}
4614+
4615+/*
4616+ * Operations from the log are applied in sequence order.
4617+ * If the operations are out of sequence, they are buffered
4618+ * until the missing operations appear.
4619+ *
4620+ * NOTE: No lock is required because there should only be
4621+ * one thread that does this!
4622+ */
4623+xtPublic void xt_xres_apply_in_order(XTThreadPtr self, XTWriterStatePtr ws, xtLogID log_id, xtLogOffset log_offset, XTXactLogBufferDPtr record)
4624+{
4625+ xtOpSeqNo op_seq;
4626+ xtTableID tab_id;
4627+ size_t len;
4628+ xtBool check_index;
4629+
4630+// XTDatabaseHPtr db, XTOpenTablePtr *ot, XTXactSeqReadPtr sr, XTDataBufferPtr databuf
4631+ switch (record->xl.xl_status_1) {
4632+ case XT_LOG_ENT_REC_MODIFIED:
4633+ case XT_LOG_ENT_UPDATE:
4634+ case XT_LOG_ENT_INSERT:
4635+ case XT_LOG_ENT_DELETE:
4636+ case XT_LOG_ENT_UPDATE_BG:
4637+ case XT_LOG_ENT_INSERT_BG:
4638+ case XT_LOG_ENT_DELETE_BG:
4639+ len = offsetof(XTactUpdateEntryDRec, xu_rec_type_1) + (size_t) XT_GET_DISK_2(record->xu.xu_size_2);
4640+ op_seq = XT_GET_DISK_4(record->xu.xu_op_seq_4);
4641+ tab_id = XT_GET_DISK_4(record->xu.xu_tab_id_4);
4642+ break;
4643+ case XT_LOG_ENT_UPDATE_FL:
4644+ case XT_LOG_ENT_INSERT_FL:
4645+ case XT_LOG_ENT_DELETE_FL:
4646+ case XT_LOG_ENT_UPDATE_FL_BG:
4647+ case XT_LOG_ENT_INSERT_FL_BG:
4648+ case XT_LOG_ENT_DELETE_FL_BG:
4649+ len = offsetof(XTactUpdateFLEntryDRec, xf_rec_type_1) + (size_t) XT_GET_DISK_2(record->xf.xf_size_2);
4650+ op_seq = XT_GET_DISK_4(record->xf.xf_op_seq_4);
4651+ tab_id = XT_GET_DISK_4(record->xf.xf_tab_id_4);
4652+ break;
4653+ case XT_LOG_ENT_REC_FREED:
4654+ case XT_LOG_ENT_REC_REMOVED:
4655+ case XT_LOG_ENT_REC_REMOVED_EXT:
4656+ /* [(7)] REMOVE is now a extended version of FREE! */
4657+ len = offsetof(XTactFreeRecEntryDRec, fr_rec_type_1) + sizeof(XTTabRecFreeDRec);
4658+ goto fixed_len_data;
4659+ case XT_LOG_ENT_REC_REMOVED_BI:
4660+ len = offsetof(XTactRemoveBIEntryDRec, rb_rec_type_1) + (size_t) XT_GET_DISK_2(record->rb.rb_size_2);
4661+ op_seq = XT_GET_DISK_4(record->rb.rb_op_seq_4);
4662+ tab_id = XT_GET_DISK_4(record->rb.rb_tab_id_4);
4663+ break;
4664+ case XT_LOG_ENT_REC_MOVED:
4665+ len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + 8;
4666+ goto fixed_len_data;
4667+ case XT_LOG_ENT_REC_CLEANED:
4668+ len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;
4669+ goto fixed_len_data;
4670+ case XT_LOG_ENT_REC_CLEANED_1:
4671+ len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + 1;
4672+ goto fixed_len_data;
4673+ case XT_LOG_ENT_REC_UNLINKED:
4674+ len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;
4675+ fixed_len_data:
4676+ op_seq = XT_GET_DISK_4(record->xw.xw_op_seq_4);
4677+ tab_id = XT_GET_DISK_4(record->xw.xw_tab_id_4);
4678+ break;
4679+ case XT_LOG_ENT_ROW_NEW:
4680+ len = sizeof(XTactRowAddedEntryDRec) - 4;
4681+ goto new_row;
4682+ case XT_LOG_ENT_ROW_NEW_FL:
4683+ len = sizeof(XTactRowAddedEntryDRec);
4684+ new_row:
4685+ op_seq = XT_GET_DISK_4(record->xa.xa_op_seq_4);
4686+ tab_id = XT_GET_DISK_4(record->xa.xa_tab_id_4);
4687+ break;
4688+ case XT_LOG_ENT_ROW_ADD_REC:
4689+ case XT_LOG_ENT_ROW_SET:
4690+ case XT_LOG_ENT_ROW_FREED:
4691+ len = offsetof(XTactWriteRowEntryDRec, wr_ref_id_4) + sizeof(XTTabRowRefDRec);
4692+ op_seq = XT_GET_DISK_4(record->wr.wr_op_seq_4);
4693+ tab_id = XT_GET_DISK_4(record->wr.wr_tab_id_4);
4694+ break;
4695+ case XT_LOG_ENT_NO_OP:
4696+ case XT_LOG_ENT_END_OF_LOG:
4697+ return;
4698+ default:
4699+ return;
4700+ }
4701+
4702+ if (!xres_open_table(self, ws, tab_id))
4703+ return;
4704+
4705+ XTTableHPtr tab = ws->ws_ot->ot_table;
4706+
4707+ /* NOTE:
4708+ *
4709+ * During normal operation this is actually given.
4710+ *
4711+ * During recovery, it only applies to the record/row files
4712+ * The index file is flushed indepently, and changes may
4713+ * have been applied to the index (due to a call to flush index,
4714+ * which comes as a result of out of memory) that have not been
4715+ * applied to the record/row files.
4716+ *
4717+ * As a result we need to do the index checks that apply to this
4718+ * change.
4719+ *
4720+ * At the moment, I will just do everything, which should not
4721+ * hurt!
4722+ *
4723+ * This error can be repeated by running the test
4724+ * runTest(OUT_OF_CACHE_UPDATE_TEST, 32, OUT_OF_CACHE_UPDATE_TEST_UPDATE_COUNT, OUT_OF_CACHE_UPDATE_TEST_SET_SIZE)
4725+ * and crashing after a while.
4726+ *
4727+ * Do this by setting not_this to NULL. This will cause the test to
4728+ * hang after a while. After a restart the indexes are corrupt if the
4729+ * ws->ws_in_recover condition is not present here.
4730+ */
4731+ if (ws->ws_in_recover) {
4732+ if (!tab->tab_recovery_done) {
4733+ /* op_seq <= tab_head_op_seq + 1: */
4734+ ASSERT(XTTableSeq::xt_op_is_before(op_seq, tab->tab_head_op_seq+2));
4735+ if (XTTableSeq::xt_op_is_before(op_seq-1, tab->tab_head_op_seq))
4736+ /* Adjust the operation sequence number: */
4737+ tab->tab_head_op_seq = op_seq-1;
4738+ tab->tab_recovery_done = TRUE;
4739+ }
4740+ }
4741+
4742+ if (!XTTableSeq::xt_op_is_before(tab->tab_head_op_seq, op_seq))
4743+ return;
4744+
4745+ if (tab->tab_head_op_seq+1 == op_seq) {
4746+ /* I could use tab_ind_rec_log_id, but this may be a problem, if
4747+ * recovery does not recover up to the last committed transaction.
4748+ */
4749+ check_index = ws->ws_in_recover && xt_comp_log_pos(log_id, log_offset, ws->ws_ind_rec_log_id, ws->ws_ind_rec_log_offset) >= 0;
4750+ xres_apply_change(self, ws->ws_ot, record, TRUE, check_index, &ws->ws_rec_buf);
4751+ tab->tab_head_op_seq = op_seq;
4752+ if (tab->tab_wr_wake_freeer) {
4753+ if (!XTTableSeq::xt_op_is_before(tab->tab_head_op_seq, tab->tab_wake_freeer_op))
4754+ xt_wr_wake_freeer(self);
4755+ }
4756+
4757+ /* Apply any operations in the list that now follow on...
4758+ * NOTE: the tab_op_list only has be locked for modification.
4759+ * This is because only one thread ever changes the list
4760+ * (on startup and the writer), but the checkpoint thread
4761+ * reads it.
4762+ */
4763+ XTOperationPtr op;
4764+ if ((op = (XTOperationPtr) xt_sl_first_item(tab->tab_op_list))) {
4765+ if (tab->tab_head_op_seq+1 == op->or_op_seq) {
4766+ xres_apply_operations(self, ws, TRUE);
4767+ }
4768+ }
4769+ }
4770+ else {
4771+ /* Add the operation to the list: */
4772+ XTOperationRec op;
4773+
4774+ op.or_op_seq = op_seq;
4775+ op.or_op_len = len;
4776+ op.or_log_id = log_id;
4777+ op.or_log_offset = log_offset;
4778+ xt_sl_lock(self, tab->tab_op_list);
4779+ xt_sl_insert(self, tab->tab_op_list, &op_seq, &op);
4780+ ASSERT(tab->tab_op_list->sl_usage_count < 1000000);
4781+ xt_sl_unlock(self, tab->tab_op_list);
4782+ }
4783+}
4784+
4785+/* ----------------------------------------------------------------------
4786+ * CHECKPOINTING FUNCTIONALITY
4787+ */
4788+
4789+static xtBool xres_delete_data_log(XTDatabaseHPtr db, xtLogID log_id)
4790+{
4791+ XTDataLogFilePtr data_log;
4792+ char path[PATH_MAX];
4793+
4794+ db->db_datalogs.dlc_name(PATH_MAX, path, log_id);
4795+
4796+ if (!db->db_datalogs.dlc_remove_data_log(log_id, TRUE))
4797+ return FAILED;
4798+
4799+ if (xt_fs_exists(path)) {
4800+#ifdef DEBUG_LOG_DELETE
4801+ printf("-- delete log: %s\n", path);
4802+#endif
4803+ if (!xt_fs_delete(NULL, path))
4804+ return FAILED;
4805+ }
4806+ /* The log was deleted: */
4807+ if (!db->db_datalogs.dlc_get_data_log(&data_log, log_id, TRUE, NULL))
4808+ return FAILED;
4809+ if (data_log) {
4810+ if (!db->db_datalogs.dls_set_log_state(data_log, XT_DL_DELETED))
4811+ return FAILED;
4812+ }
4813+ return OK;
4814+}
4815+
4816+static int xres_comp_flush_tabs(XTThreadPtr XT_UNUSED(self), register const void *XT_UNUSED(thunk), register const void *a, register const void *b)
4817+{
4818+ xtTableID tab_id = *((xtTableID *) a);
4819+ XTCheckPointTablePtr cp_tab = (XTCheckPointTablePtr) b;
4820+
4821+ if (tab_id < cp_tab->cpt_tab_id)
4822+ return -1;
4823+ if (tab_id > cp_tab->cpt_tab_id)
4824+ return 1;
4825+ return 0;
4826+}
4827+
4828+static void xres_init_checkpoint_state(XTThreadPtr self, XTCheckPointStatePtr cp)
4829+{
4830+ xt_init_mutex_with_autoname(self, &cp->cp_state_lock);
4831+}
4832+
4833+static void xres_free_checkpoint_state(XTThreadPtr self, XTCheckPointStatePtr cp)
4834+{
4835+ xt_free_mutex(&cp->cp_state_lock);
4836+ if (cp->cp_table_ids) {
4837+ xt_free_sortedlist(self, cp->cp_table_ids);
4838+ cp->cp_table_ids = NULL;
4839+ }
4840+}
4841+
4842+/*
4843+ * Remove the deleted logs so that they can be re-used.
4844+ * This is only possible after a checkpoint has been
4845+ * written that does _not_ include these logs as logs
4846+ * to be deleted!
4847+ */
4848+static xtBool xres_remove_data_logs(XTDatabaseHPtr db)
4849+{
4850+ u_int no_of_logs = xt_sl_get_size(db->db_datalogs.dlc_deleted);
4851+ xtLogID *log_id_ptr;
4852+
4853+ for (u_int i=0; i<no_of_logs; i++) {
4854+ log_id_ptr = (xtLogID *) xt_sl_item_at(db->db_datalogs.dlc_deleted, i);
4855+ if (!db->db_datalogs.dlc_remove_data_log(*log_id_ptr, FALSE))
4856+ return FAILED;
4857+ }
4858+ xt_sl_set_size(db->db_datalogs.dlc_deleted, 0);
4859+ return OK;
4860+}
4861+
4862+/* ----------------------------------------------------------------------
4863+ * INIT & EXIT
4864+ */
4865+
4866+xtPublic void xt_xres_init(XTThreadPtr self, XTDatabaseHPtr db)
4867+{
4868+ xtLogID max_log_id;
4869+
4870+ xt_init_mutex_with_autoname(self, &db->db_cp_lock);
4871+ xt_init_cond(self, &db->db_cp_cond);
4872+
4873+ xres_init_checkpoint_state(self, &db->db_cp_state);
4874+ db->db_restart.xres_init(self, db, &db->db_wr_log_id, &db->db_wr_log_offset, &max_log_id);
4875+
4876+ /* It is also the position where transactions will start writing the
4877+ * log:
4878+ */
4879+ if (!db->db_xlog.xlog_set_write_offset(db->db_wr_log_id, db->db_wr_log_offset, max_log_id, self))
4880+ xt_throw(self);
4881+}
4882+
4883+xtPublic void xt_xres_exit(XTThreadPtr self, XTDatabaseHPtr db)
4884+{
4885+ db->db_restart.xres_exit(self);
4886+ xres_free_checkpoint_state(self, &db->db_cp_state);
4887+ xt_free_mutex(&db->db_cp_lock);
4888+ xt_free_cond(&db->db_cp_cond);
4889+}
4890+
4891+/* ----------------------------------------------------------------------
4892+ * RESTART FUNCTIONALITY
4893+ */
4894+
4895+/*
4896+ * Restart the database. This function loads the restart position, and
4897+ * applies all changes in the logs, until the end of the log, or
4898+ * a corrupted record is found.
4899+ *
4900+ * The restart position is the position in the log where we know that
4901+ * all the changes up to that point have been flushed to the
4902+ * database.
4903+ *
4904+ * This is called the checkpoint position. The checkpoint position
4905+ * is written alternatively to 2 restart files.
4906+ *
4907+ * To make a checkpoint:
4908+ * Get the current log writer log offset.
4909+ * For each table:
4910+ * Get the log offset of the next operation on the table, if an
4911+ * operation is queued for the table.
4912+ * Flush that table, and the operation sequence to the table.
4913+ * For each unclean transaction:
4914+ * Get the log offset of the begin of the transaction.
4915+ * Write the lowest of all log offsets to the restart file!
4916+ */
4917+
4918+void XTXactRestart::xres_init(XTThreadPtr self, XTDatabaseHPtr db, xtLogID *log_id, xtLogOffset *log_offset, xtLogID *max_log_id)
4919+{
4920+ char path[PATH_MAX];
4921+ XTOpenFilePtr of = NULL;
4922+ XTXlogCheckpointDPtr res_1_buffer = NULL;
4923+ XTXlogCheckpointDPtr res_2_buffer = NULL;
4924+ XTXlogCheckpointDPtr use_buffer;
4925+ xtLogID ind_rec_log_id = 0;
4926+ xtLogOffset ind_rec_log_offset = 0;
4927+
4928+ enter_();
4929+ xres_db = db;
4930+
4931+ ASSERT(!self->st_database);
4932+ /* The following call stack:
4933+ * XTDatabaseLog::xlog_flush_pending()
4934+ * XTDatabaseLog::xlog_flush()
4935+ * xt_xlog_flush_log()
4936+ * xt_flush_indices()
4937+ * idx_out_of_memory_failure()
4938+ * xt_idx_delete()
4939+ * xres_remove_index_entries()
4940+ * xres_apply_change()
4941+ * xt_xres_apply_in_order()
4942+ * XTXactRestart::xres_restart()
4943+ * XTXactRestart::xres_init()
4944+ * Leads to st_database being used!
4945+ */
4946+ self->st_database = db;
4947+
4948+#ifdef SKIP_STARTUP_CHECKPOINT
4949+ /* When debugging, we do not checkpoint immediately, just in case
4950+ * we detect a problem during recovery.
4951+ */
4952+ xres_cp_required = FALSE;
4953+#else
4954+ xres_cp_required = TRUE;
4955+#endif
4956+ xres_cp_number = 0;
4957+ try_(a) {
4958+
4959+ /* Figure out which restart file to use.
4960+ */
4961+ xres_name(PATH_MAX, path, 1);
4962+ if ((of = xt_open_file(self, path, XT_FS_MISSING_OK))) {
4963+ size_t res_1_size;
4964+
4965+ res_1_size = (size_t) xt_seek_eof_file(self, of);
4966+ res_1_buffer = (XTXlogCheckpointDPtr) xt_malloc(self, res_1_size);
4967+ if (!xt_pread_file(of, 0, res_1_size, res_1_size, res_1_buffer, NULL, &self->st_statistics.st_x, self))
4968+ xt_throw(self);
4969+ xt_close_file(self, of);
4970+ of = NULL;
4971+ if (!xres_check_checksum(res_1_buffer, res_1_size)) {
4972+ xt_free(self, res_1_buffer);
4973+ res_1_buffer = NULL;
4974+ }
4975+ }
4976+
4977+ xres_name(PATH_MAX, path, 2);
4978+ if ((of = xt_open_file(self, path, XT_FS_MISSING_OK))) {
4979+ size_t res_2_size;
4980+
4981+ res_2_size = (size_t) xt_seek_eof_file(self, of);
4982+ res_2_buffer = (XTXlogCheckpointDPtr) xt_malloc(self, res_2_size);
4983+ if (!xt_pread_file(of, 0, res_2_size, res_2_size, res_2_buffer, NULL, &self->st_statistics.st_x, self))
4984+ xt_throw(self);
4985+ xt_close_file(self, of);
4986+ of = NULL;
4987+ if (!xres_check_checksum(res_2_buffer, res_2_size)) {
4988+ xt_free(self, res_2_buffer);
4989+ res_2_buffer = NULL;
4990+ }
4991+ }
4992+
4993+ if (res_1_buffer && res_2_buffer) {
4994+ if (xt_comp_log_pos(
4995+ XT_GET_DISK_4(res_1_buffer->xcp_log_id_4),
4996+ XT_GET_DISK_6(res_1_buffer->xcp_log_offs_6),
4997+ XT_GET_DISK_4(res_2_buffer->xcp_log_id_4),
4998+ XT_GET_DISK_6(res_2_buffer->xcp_log_offs_6)) > 0) {
4999+ /* The first log is the further along than the second: */
5000+ xt_free(self, res_2_buffer);
The diff has been truncated for viewing.

Subscribers

People subscribed via source and target branches