Merge lp:~vkolesnikov/pbxt/pbxt-bug-379315 into lp:pbxt

Proposed by Vladimir Kolesnikov
Status: Merged
Merged at revision: not available
Proposed branch: lp:~vkolesnikov/pbxt/pbxt-bug-379315
Merge into: lp:pbxt
Diff against target: None lines
To merge this branch: bzr merge lp:~vkolesnikov/pbxt/pbxt-bug-379315
Reviewer Review Type Date Requested Status
PBXT Core Pending
Review via email: mp+7292@code.launchpad.net
To post a comment you must log in.
lp:~vkolesnikov/pbxt/pbxt-bug-379315 updated
657. By Paul McCullagh

Merged RN245

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'ChangeLog'
--- ChangeLog 2009-06-03 14:53:00 +0000
+++ ChangeLog 2009-06-10 16:26:33 +0000
@@ -3,6 +3,10 @@
33
4------- 1.0.08 RC - Not yet released4------- 1.0.08 RC - Not yet released
55
6RN245: Fixed bug #379315: Inconsistent behavior of DELETE IGNORE and FK constraint
7
8RN244: Fixed a recovery problem: during the recovery of "record modified" action the table was updated before the old index entries were removed; then the xres_remove_index_entries was supplied the new record which lead to incorrect index update
9
6RN243: Fixed a bug that caused a recovery failure if partitioned pbxt tables where present. This happended because the recovery used a MySQL function to open tables and the PBXT handler was not yet registered10RN243: Fixed a bug that caused a recovery failure if partitioned pbxt tables where present. This happended because the recovery used a MySQL function to open tables and the PBXT handler was not yet registered
711
8RN242: Fixed a bug that caused a deadlock if pbxt initialization failed. This happened because pbxt ceanup was done from pbxt_init() with PLUGIN_lock being held by MySQL which lead to a deadlock in the freeer thread12RN242: Fixed a bug that caused a deadlock if pbxt initialization failed. This happened because pbxt ceanup was done from pbxt_init() with PLUGIN_lock being held by MySQL which lead to a deadlock in the freeer thread
913
=== modified file 'src/ha_pbxt.cc'
--- src/ha_pbxt.cc 2009-06-03 14:07:54 +0000
+++ src/ha_pbxt.cc 2009-06-10 16:26:33 +0000
@@ -1565,7 +1565,11 @@
1565 freer_(); // xt_unlock_mutex(share->sh_ex_mutex)1565 freer_(); // xt_unlock_mutex(share->sh_ex_mutex)
1566}1566}
15671567
1568#ifdef PBXT_ALLOW_PRINTING
1569static void ha_release_exclusive_use(XTThreadPtr self, XTSharePtr share)
1570#else
1568static void ha_release_exclusive_use(XTThreadPtr XT_UNUSED(self), XTSharePtr share)1571static void ha_release_exclusive_use(XTThreadPtr XT_UNUSED(self), XTSharePtr share)
1572#endif
1569{1573{
1570 XT_PRINT1(self, "ha_release_exclusive_use %s PBXT X UNLOCK\n", share->sh_table_path->ps_path);1574 XT_PRINT1(self, "ha_release_exclusive_use %s PBXT X UNLOCK\n", share->sh_table_path->ps_path);
1571 xt_lock_mutex_ns((xt_mutex_type *) share->sh_ex_mutex);1575 xt_lock_mutex_ns((xt_mutex_type *) share->sh_ex_mutex);
@@ -4164,10 +4168,15 @@
4164 pb_open_tab->ot_is_modify = FALSE;4168 pb_open_tab->ot_is_modify = FALSE;
4165 if ((pb_open_tab->ot_for_update = (lock_type == F_WRLCK))) {4169 if ((pb_open_tab->ot_for_update = (lock_type == F_WRLCK))) {
4166 switch ((int) thd_sql_command(thd)) {4170 switch ((int) thd_sql_command(thd)) {
4171 case SQLCOM_DELETE:
4172 case SQLCOM_DELETE_MULTI:
4173 /* turn DELETE IGNORE into normal DELETE. The IGNORE option causes problems because
4174 * when a record is deleted we add an xlog record which we cannot "rollback" later
4175 * when we find that an FK-constraint has failed.
4176 */
4177 thd->lex->ignore = false;
4167 case SQLCOM_UPDATE:4178 case SQLCOM_UPDATE:
4168 case SQLCOM_UPDATE_MULTI:4179 case SQLCOM_UPDATE_MULTI:
4169 case SQLCOM_DELETE:
4170 case SQLCOM_DELETE_MULTI:
4171 case SQLCOM_REPLACE:4180 case SQLCOM_REPLACE:
4172 case SQLCOM_REPLACE_SELECT:4181 case SQLCOM_REPLACE_SELECT:
4173 case SQLCOM_INSERT:4182 case SQLCOM_INSERT:
@@ -4644,7 +4653,7 @@
4644{4653{
4645 THD *thd = current_thd;4654 THD *thd = current_thd;
4646 int err = 0;4655 int err = 0;
4647 XTThreadPtr self;4656 XTThreadPtr self = NULL;
4648 XTSharePtr share;4657 XTSharePtr share;
46494658
4650 STAT_TRACE(self, *thd_query(thd));4659 STAT_TRACE(self, *thd_query(thd));
46514660
=== modified file 'src/restart_xt.cc'
--- src/restart_xt.cc 2009-06-03 14:07:54 +0000
+++ src/restart_xt.cc 2009-06-10 16:27:03 +0000
@@ -1,3202 +1,3207 @@
1/* Copyright (c) 2007 PrimeBase Technologies GmbH1/* Copyright (c) 2007 PrimeBase Technologies GmbH
2 *2 *
3 * PrimeBase XT3 * PrimeBase XT
4 *4 *
5 * This program is free software; you can redistribute it and/or modify5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.8 * (at your option) any later version.
9 *9 *
10 * This program is distributed in the hope that it will be useful,10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.13 * GNU General Public License for more details.
14 *14 *
15 * You should have received a copy of the GNU General Public License15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 *18 *
19 * 2007-11-12 Paul McCullagh19 * 2007-11-12 Paul McCullagh
20 *20 *
21 * H&G2JCtL21 * H&G2JCtL
22 *22 *
23 * Restart and write data to the database.23 * Restart and write data to the database.
24 */24 */
2525
26#include "xt_config.h"26#include "xt_config.h"
2727
28#include <signal.h>28#include <signal.h>
29#include <time.h>29#include <time.h>
3030
31#ifndef DRIZZLED31#ifndef DRIZZLED
32#include "mysql_priv.h"32#include "mysql_priv.h"
33#endif33#endif
3434
35#include "ha_pbxt.h"35#include "ha_pbxt.h"
3636
37#include "xactlog_xt.h"37#include "xactlog_xt.h"
38#include "database_xt.h"38#include "database_xt.h"
39#include "util_xt.h"39#include "util_xt.h"
40#include "strutil_xt.h"40#include "strutil_xt.h"
41#include "filesys_xt.h"41#include "filesys_xt.h"
42#include "restart_xt.h"42#include "restart_xt.h"
43#include "myxt_xt.h"43#include "myxt_xt.h"
44#include "trace_xt.h"44#include "trace_xt.h"
4545
46#ifdef DEBUG46#ifdef DEBUG
47//#define DEBUG_PRINT47//#define DEBUG_PRINT
48//#define DEBUG_KEEP_LOGS48//#define DEBUG_KEEP_LOGS
49//#define PRINT_LOG_ON_RECOVERY49//#define PRINT_LOG_ON_RECOVERY
50//#define TRACE_RECORD_DATA50//#define TRACE_RECORD_DATA
51//#define SKIP_STARTUP_CHECKPOINT51//#define SKIP_STARTUP_CHECKPOINT
52//#define NEVER_CHECKPOINT52//#define NEVER_CHECKPOINT
53//#define TRACE_CHECKPOINT53//#define TRACE_CHECKPOINT
54#endif54#endif
5555
56#define PRINTF printf56#define PRINTF printf
57//#define PRINTF xt_ftracef57//#define PRINTF xt_ftracef
58//#define PRINTF xt_trace58//#define PRINTF xt_trace
5959
60void xt_print_bytes(xtWord1 *buf, u_int len)60void xt_print_bytes(xtWord1 *buf, u_int len)
61{61{
62 for (u_int i=0; i<len; i++) {62 for (u_int i=0; i<len; i++) {
63 PRINTF("%02x ", (u_int) *buf);63 PRINTF("%02x ", (u_int) *buf);
64 buf++;64 buf++;
65 }65 }
66}66}
6767
68void xt_print_log_record(xtLogID log, xtLogOffset offset, XTXactLogBufferDPtr record)68void xt_print_log_record(xtLogID log, xtLogOffset offset, XTXactLogBufferDPtr record)
69{69{
70 const char *type = NULL;70 const char *type = NULL;
71 const char *rec_type = NULL;71 const char *rec_type = NULL;
72 xtOpSeqNo op_no = 0;72 xtOpSeqNo op_no = 0;
73 xtTableID tab_id = 0;73 xtTableID tab_id = 0;
74 xtRowID row_id = 0;74 xtRowID row_id = 0;
75 xtRecordID rec_id = 0;75 xtRecordID rec_id = 0;
76 xtBool xn_set = FALSE;76 xtBool xn_set = FALSE;
77 xtXactID xn_id = 0;77 xtXactID xn_id = 0;
78 char buffer[200];78 char buffer[200];
79 XTTabRecExtDPtr rec_buf;79 XTTabRecExtDPtr rec_buf;
80 XTTabRecExtDPtr ext_rec;80 XTTabRecExtDPtr ext_rec;
81 XTTabRecFixDPtr fix_rec;81 XTTabRecFixDPtr fix_rec;
82 u_int rec_len;82 u_int rec_len;
83 xtLogID log_id = 0;83 xtLogID log_id = 0;
84 xtLogOffset log_offset = 0;84 xtLogOffset log_offset = 0;
8585
86 rec_buf = NULL;86 rec_buf = NULL;
87 ext_rec = NULL;87 ext_rec = NULL;
88 fix_rec = NULL;88 fix_rec = NULL;
89 rec_len = 0;89 rec_len = 0;
90 switch (record->xl.xl_status_1) {90 switch (record->xl.xl_status_1) {
91 case XT_LOG_ENT_REC_MODIFIED:91 case XT_LOG_ENT_REC_MODIFIED:
92 case XT_LOG_ENT_UPDATE:92 case XT_LOG_ENT_UPDATE:
93 case XT_LOG_ENT_INSERT:93 case XT_LOG_ENT_INSERT:
94 case XT_LOG_ENT_DELETE:94 case XT_LOG_ENT_DELETE:
95 case XT_LOG_ENT_UPDATE_BG:95 case XT_LOG_ENT_UPDATE_BG:
96 case XT_LOG_ENT_INSERT_BG:96 case XT_LOG_ENT_INSERT_BG:
97 case XT_LOG_ENT_DELETE_BG:97 case XT_LOG_ENT_DELETE_BG:
98 op_no = XT_GET_DISK_4(record->xu.xu_op_seq_4);98 op_no = XT_GET_DISK_4(record->xu.xu_op_seq_4);
99 tab_id = XT_GET_DISK_4(record->xu.xu_tab_id_4);99 tab_id = XT_GET_DISK_4(record->xu.xu_tab_id_4);
100 rec_id = XT_GET_DISK_4(record->xu.xu_rec_id_4);100 rec_id = XT_GET_DISK_4(record->xu.xu_rec_id_4);
101 xn_id = XT_GET_DISK_4(record->xu.xu_xact_id_4);101 xn_id = XT_GET_DISK_4(record->xu.xu_xact_id_4);
102 row_id = XT_GET_DISK_4(record->xu.xu_row_id_4);102 row_id = XT_GET_DISK_4(record->xu.xu_row_id_4);
103 rec_len = XT_GET_DISK_2(record->xu.xu_size_2);103 rec_len = XT_GET_DISK_2(record->xu.xu_size_2);
104 xn_set = TRUE;104 xn_set = TRUE;
105 type="rec";105 type="rec";
106 rec_buf = (XTTabRecExtDPtr) &record->xu.xu_rec_type_1;106 rec_buf = (XTTabRecExtDPtr) &record->xu.xu_rec_type_1;
107 ext_rec = (XTTabRecExtDPtr) &record->xu.xu_rec_type_1;107 ext_rec = (XTTabRecExtDPtr) &record->xu.xu_rec_type_1;
108 if (XT_REC_IS_EXT_DLOG(ext_rec->tr_rec_type_1)) {108 if (XT_REC_IS_EXT_DLOG(ext_rec->tr_rec_type_1)) {
109 log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);109 log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
110 log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);110 log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
111 }111 }
112 else {112 else {
113 ext_rec = NULL;113 ext_rec = NULL;
114 fix_rec = (XTTabRecFixDPtr) &record->xu.xu_rec_type_1;114 fix_rec = (XTTabRecFixDPtr) &record->xu.xu_rec_type_1;
115 }115 }
116 break;116 break;
117 case XT_LOG_ENT_UPDATE_FL:117 case XT_LOG_ENT_UPDATE_FL:
118 case XT_LOG_ENT_INSERT_FL:118 case XT_LOG_ENT_INSERT_FL:
119 case XT_LOG_ENT_DELETE_FL:119 case XT_LOG_ENT_DELETE_FL:
120 case XT_LOG_ENT_UPDATE_FL_BG:120 case XT_LOG_ENT_UPDATE_FL_BG:
121 case XT_LOG_ENT_INSERT_FL_BG:121 case XT_LOG_ENT_INSERT_FL_BG:
122 case XT_LOG_ENT_DELETE_FL_BG:122 case XT_LOG_ENT_DELETE_FL_BG:
123 op_no = XT_GET_DISK_4(record->xf.xf_op_seq_4);123 op_no = XT_GET_DISK_4(record->xf.xf_op_seq_4);
124 tab_id = XT_GET_DISK_4(record->xf.xf_tab_id_4);124 tab_id = XT_GET_DISK_4(record->xf.xf_tab_id_4);
125 rec_id = XT_GET_DISK_4(record->xf.xf_rec_id_4);125 rec_id = XT_GET_DISK_4(record->xf.xf_rec_id_4);
126 xn_id = XT_GET_DISK_4(record->xf.xf_xact_id_4);126 xn_id = XT_GET_DISK_4(record->xf.xf_xact_id_4);
127 row_id = XT_GET_DISK_4(record->xf.xf_row_id_4);127 row_id = XT_GET_DISK_4(record->xf.xf_row_id_4);
128 rec_len = XT_GET_DISK_2(record->xf.xf_size_2);128 rec_len = XT_GET_DISK_2(record->xf.xf_size_2);
129 xn_set = TRUE;129 xn_set = TRUE;
130 type="rec";130 type="rec";
131 rec_buf = (XTTabRecExtDPtr) &record->xf.xf_rec_type_1;131 rec_buf = (XTTabRecExtDPtr) &record->xf.xf_rec_type_1;
132 ext_rec = (XTTabRecExtDPtr) &record->xf.xf_rec_type_1;132 ext_rec = (XTTabRecExtDPtr) &record->xf.xf_rec_type_1;
133 if (XT_REC_IS_EXT_DLOG(ext_rec->tr_rec_type_1)) {133 if (XT_REC_IS_EXT_DLOG(ext_rec->tr_rec_type_1)) {
134 log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);134 log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
135 log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);135 log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
136 }136 }
137 else {137 else {
138 ext_rec = NULL;138 ext_rec = NULL;
139 fix_rec = (XTTabRecFixDPtr) &record->xf.xf_rec_type_1;139 fix_rec = (XTTabRecFixDPtr) &record->xf.xf_rec_type_1;
140 }140 }
141 break;141 break;
142 case XT_LOG_ENT_REC_FREED:142 case XT_LOG_ENT_REC_FREED:
143 case XT_LOG_ENT_REC_REMOVED:143 case XT_LOG_ENT_REC_REMOVED:
144 case XT_LOG_ENT_REC_REMOVED_EXT:144 case XT_LOG_ENT_REC_REMOVED_EXT:
145 op_no = XT_GET_DISK_4(record->fr.fr_op_seq_4);145 op_no = XT_GET_DISK_4(record->fr.fr_op_seq_4);
146 tab_id = XT_GET_DISK_4(record->fr.fr_tab_id_4);146 tab_id = XT_GET_DISK_4(record->fr.fr_tab_id_4);
147 rec_id = XT_GET_DISK_4(record->fr.fr_rec_id_4);147 rec_id = XT_GET_DISK_4(record->fr.fr_rec_id_4);
148 xn_id = XT_GET_DISK_4(record->fr.fr_xact_id_4);148 xn_id = XT_GET_DISK_4(record->fr.fr_xact_id_4);
149 xn_set = TRUE;149 xn_set = TRUE;
150 type="rec";150 type="rec";
151 break;151 break;
152 case XT_LOG_ENT_REC_REMOVED_BI:152 case XT_LOG_ENT_REC_REMOVED_BI:
153 op_no = XT_GET_DISK_4(record->rb.rb_op_seq_4);153 op_no = XT_GET_DISK_4(record->rb.rb_op_seq_4);
154 tab_id = XT_GET_DISK_4(record->rb.rb_tab_id_4);154 tab_id = XT_GET_DISK_4(record->rb.rb_tab_id_4);
155 rec_id = XT_GET_DISK_4(record->rb.rb_rec_id_4);155 rec_id = XT_GET_DISK_4(record->rb.rb_rec_id_4);
156 xn_id = XT_GET_DISK_4(record->rb.rb_xact_id_4);156 xn_id = XT_GET_DISK_4(record->rb.rb_xact_id_4);
157 row_id = XT_GET_DISK_4(record->rb.rb_row_id_4);157 row_id = XT_GET_DISK_4(record->rb.rb_row_id_4);
158 rec_len = XT_GET_DISK_2(record->rb.rb_size_2);158 rec_len = XT_GET_DISK_2(record->rb.rb_size_2);
159 xn_set = TRUE;159 xn_set = TRUE;
160 type="rec";160 type="rec";
161 rec_buf = (XTTabRecExtDPtr) &record->rb.rb_rec_type_1;161 rec_buf = (XTTabRecExtDPtr) &record->rb.rb_rec_type_1;
162 ext_rec = (XTTabRecExtDPtr) &record->rb.rb_rec_type_1;162 ext_rec = (XTTabRecExtDPtr) &record->rb.rb_rec_type_1;
163 if (XT_REC_IS_EXT_DLOG(record->rb.rb_rec_type_1)) {163 if (XT_REC_IS_EXT_DLOG(record->rb.rb_rec_type_1)) {
164 log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);164 log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
165 log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);165 log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
166 }166 }
167 else {167 else {
168 ext_rec = NULL;168 ext_rec = NULL;
169 fix_rec = (XTTabRecFixDPtr) &record->rb.rb_rec_type_1;169 fix_rec = (XTTabRecFixDPtr) &record->rb.rb_rec_type_1;
170 }170 }
171 break;171 break;
172 case XT_LOG_ENT_REC_MOVED:172 case XT_LOG_ENT_REC_MOVED:
173 op_no = XT_GET_DISK_4(record->xw.xw_op_seq_4);173 op_no = XT_GET_DISK_4(record->xw.xw_op_seq_4);
174 tab_id = XT_GET_DISK_4(record->xw.xw_tab_id_4);174 tab_id = XT_GET_DISK_4(record->xw.xw_tab_id_4);
175 rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);175 rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);
176 log_id = XT_GET_DISK_2(&record->xw.xw_rec_type_1); // This is actually correct176 log_id = XT_GET_DISK_2(&record->xw.xw_rec_type_1); // This is actually correct
177 log_offset = XT_GET_DISK_6(record->xw.xw_next_rec_id_4); // This is actually correct!177 log_offset = XT_GET_DISK_6(record->xw.xw_next_rec_id_4); // This is actually correct!
178 type="rec";178 type="rec";
179 break;179 break;
180 case XT_LOG_ENT_REC_CLEANED:180 case XT_LOG_ENT_REC_CLEANED:
181 case XT_LOG_ENT_REC_CLEANED_1:181 case XT_LOG_ENT_REC_CLEANED_1:
182 case XT_LOG_ENT_REC_UNLINKED:182 case XT_LOG_ENT_REC_UNLINKED:
183 op_no = XT_GET_DISK_4(record->xw.xw_op_seq_4);183 op_no = XT_GET_DISK_4(record->xw.xw_op_seq_4);
184 tab_id = XT_GET_DISK_4(record->xw.xw_tab_id_4);184 tab_id = XT_GET_DISK_4(record->xw.xw_tab_id_4);
185 rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);185 rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);
186 type="rec";186 type="rec";
187 break;187 break;
188 case XT_LOG_ENT_ROW_NEW:188 case XT_LOG_ENT_ROW_NEW:
189 case XT_LOG_ENT_ROW_NEW_FL:189 case XT_LOG_ENT_ROW_NEW_FL:
190 case XT_LOG_ENT_ROW_ADD_REC:190 case XT_LOG_ENT_ROW_ADD_REC:
191 case XT_LOG_ENT_ROW_SET:191 case XT_LOG_ENT_ROW_SET:
192 case XT_LOG_ENT_ROW_FREED:192 case XT_LOG_ENT_ROW_FREED:
193 op_no = XT_GET_DISK_4(record->xa.xa_op_seq_4);193 op_no = XT_GET_DISK_4(record->xa.xa_op_seq_4);
194 tab_id = XT_GET_DISK_4(record->xa.xa_tab_id_4);194 tab_id = XT_GET_DISK_4(record->xa.xa_tab_id_4);
195 rec_id = XT_GET_DISK_4(record->xa.xa_row_id_4);195 rec_id = XT_GET_DISK_4(record->xa.xa_row_id_4);
196 type="row";196 type="row";
197 break;197 break;
198 case XT_LOG_ENT_NO_OP:198 case XT_LOG_ENT_NO_OP:
199 op_no = XT_GET_DISK_4(record->no.no_op_seq_4);199 op_no = XT_GET_DISK_4(record->no.no_op_seq_4);
200 tab_id = XT_GET_DISK_4(record->no.no_tab_id_4);200 tab_id = XT_GET_DISK_4(record->no.no_tab_id_4);
201 type="-";201 type="-";
202 break;202 break;
203 case XT_LOG_ENT_END_OF_LOG:203 case XT_LOG_ENT_END_OF_LOG:
204 break;204 break;
205 }205 }
206206
207 switch (record->xl.xl_status_1) {207 switch (record->xl.xl_status_1) {
208 case XT_LOG_ENT_HEADER:208 case XT_LOG_ENT_HEADER:
209 rec_type = "HEADER";209 rec_type = "HEADER";
210 break;210 break;
211 case XT_LOG_ENT_NEW_LOG:211 case XT_LOG_ENT_NEW_LOG:
212 rec_type = "NEW LOG";212 rec_type = "NEW LOG";
213 break;213 break;
214 case XT_LOG_ENT_DEL_LOG:214 case XT_LOG_ENT_DEL_LOG:
215 sprintf(buffer, "DEL LOG log=%d ", (int) XT_GET_DISK_4(record->xl.xl_log_id_4));215 sprintf(buffer, "DEL LOG log=%d ", (int) XT_GET_DISK_4(record->xl.xl_log_id_4));
216 rec_type = buffer;216 rec_type = buffer;
217 break;217 break;
218 case XT_LOG_ENT_NEW_TAB:218 case XT_LOG_ENT_NEW_TAB:
219 rec_type = "NEW TABLE";219 rec_type = "NEW TABLE";
220 break;220 break;
221 case XT_LOG_ENT_COMMIT:221 case XT_LOG_ENT_COMMIT:
222 rec_type = "COMMIT";222 rec_type = "COMMIT";
223 xn_id = XT_GET_DISK_4(record->xe.xe_xact_id_4);223 xn_id = XT_GET_DISK_4(record->xe.xe_xact_id_4);
224 xn_set = TRUE;224 xn_set = TRUE;
225 break;225 break;
226 case XT_LOG_ENT_ABORT:226 case XT_LOG_ENT_ABORT:
227 rec_type = "ABORT";227 rec_type = "ABORT";
228 xn_id = XT_GET_DISK_4(record->xe.xe_xact_id_4);228 xn_id = XT_GET_DISK_4(record->xe.xe_xact_id_4);
229 xn_set = TRUE;229 xn_set = TRUE;
230 break;230 break;
231 case XT_LOG_ENT_CLEANUP:231 case XT_LOG_ENT_CLEANUP:
232 rec_type = "CLEANUP";232 rec_type = "CLEANUP";
233 xn_id = XT_GET_DISK_4(record->xc.xc_xact_id_4);233 xn_id = XT_GET_DISK_4(record->xc.xc_xact_id_4);
234 xn_set = TRUE;234 xn_set = TRUE;
235 break;235 break;
236 case XT_LOG_ENT_REC_MODIFIED:236 case XT_LOG_ENT_REC_MODIFIED:
237 rec_type = "MODIFIED";237 rec_type = "MODIFIED";
238 break;238 break;
239 case XT_LOG_ENT_UPDATE:239 case XT_LOG_ENT_UPDATE:
240 rec_type = "UPDATE";240 rec_type = "UPDATE";
241 break;241 break;
242 case XT_LOG_ENT_UPDATE_FL:242 case XT_LOG_ENT_UPDATE_FL:
243 rec_type = "UPDATE-FL";243 rec_type = "UPDATE-FL";
244 break;244 break;
245 case XT_LOG_ENT_INSERT:245 case XT_LOG_ENT_INSERT:
246 rec_type = "INSERT";246 rec_type = "INSERT";
247 break;247 break;
248 case XT_LOG_ENT_INSERT_FL:248 case XT_LOG_ENT_INSERT_FL:
249 rec_type = "INSERT-FL";249 rec_type = "INSERT-FL";
250 break;250 break;
251 case XT_LOG_ENT_DELETE:251 case XT_LOG_ENT_DELETE:
252 rec_type = "DELETE";252 rec_type = "DELETE";
253 break;253 break;
254 case XT_LOG_ENT_DELETE_FL:254 case XT_LOG_ENT_DELETE_FL:
255 rec_type = "DELETE-FL-BG";255 rec_type = "DELETE-FL-BG";
256 break;256 break;
257 case XT_LOG_ENT_UPDATE_BG:257 case XT_LOG_ENT_UPDATE_BG:
258 rec_type = "UPDATE-BG";258 rec_type = "UPDATE-BG";
259 break;259 break;
260 case XT_LOG_ENT_UPDATE_FL_BG:260 case XT_LOG_ENT_UPDATE_FL_BG:
261 rec_type = "UPDATE-FL-BG";261 rec_type = "UPDATE-FL-BG";
262 break;262 break;
263 case XT_LOG_ENT_INSERT_BG:263 case XT_LOG_ENT_INSERT_BG:
264 rec_type = "INSERT-BG";264 rec_type = "INSERT-BG";
265 break;265 break;
266 case XT_LOG_ENT_INSERT_FL_BG:266 case XT_LOG_ENT_INSERT_FL_BG:
267 rec_type = "INSERT-FL-BG";267 rec_type = "INSERT-FL-BG";
268 break;268 break;
269 case XT_LOG_ENT_DELETE_BG:269 case XT_LOG_ENT_DELETE_BG:
270 rec_type = "DELETE-BG";270 rec_type = "DELETE-BG";
271 break;271 break;
272 case XT_LOG_ENT_DELETE_FL_BG:272 case XT_LOG_ENT_DELETE_FL_BG:
273 rec_type = "DELETE-FL-BG";273 rec_type = "DELETE-FL-BG";
274 break;274 break;
275 case XT_LOG_ENT_REC_FREED:275 case XT_LOG_ENT_REC_FREED:
276 rec_type = "FREE REC";276 rec_type = "FREE REC";
277 break;277 break;
278 case XT_LOG_ENT_REC_REMOVED:278 case XT_LOG_ENT_REC_REMOVED:
279 rec_type = "REMOVED REC";279 rec_type = "REMOVED REC";
280 break;280 break;
281 case XT_LOG_ENT_REC_REMOVED_EXT:281 case XT_LOG_ENT_REC_REMOVED_EXT:
282 rec_type = "REMOVED-X REC";282 rec_type = "REMOVED-X REC";
283 break;283 break;
284 case XT_LOG_ENT_REC_REMOVED_BI:284 case XT_LOG_ENT_REC_REMOVED_BI:
285 rec_type = "REMOVED-BI REC";285 rec_type = "REMOVED-BI REC";
286 break;286 break;
287 case XT_LOG_ENT_REC_MOVED:287 case XT_LOG_ENT_REC_MOVED:
288 rec_type = "MOVED REC";288 rec_type = "MOVED REC";
289 break;289 break;
290 case XT_LOG_ENT_REC_CLEANED:290 case XT_LOG_ENT_REC_CLEANED:
291 rec_type = "CLEAN REC";291 rec_type = "CLEAN REC";
292 break;292 break;
293 case XT_LOG_ENT_REC_CLEANED_1:293 case XT_LOG_ENT_REC_CLEANED_1:
294 rec_type = "CLEAN REC-1";294 rec_type = "CLEAN REC-1";
295 break;295 break;
296 case XT_LOG_ENT_REC_UNLINKED:296 case XT_LOG_ENT_REC_UNLINKED:
297 rec_type = "UNLINK REC";297 rec_type = "UNLINK REC";
298 break;298 break;
299 case XT_LOG_ENT_ROW_NEW:299 case XT_LOG_ENT_ROW_NEW:
300 rec_type = "NEW ROW";300 rec_type = "NEW ROW";
301 break;301 break;
302 case XT_LOG_ENT_ROW_NEW_FL:302 case XT_LOG_ENT_ROW_NEW_FL:
303 rec_type = "NEW ROW-FL";303 rec_type = "NEW ROW-FL";
304 break;304 break;
305 case XT_LOG_ENT_ROW_ADD_REC:305 case XT_LOG_ENT_ROW_ADD_REC:
306 rec_type = "REC ADD ROW";306 rec_type = "REC ADD ROW";
307 break;307 break;
308 case XT_LOG_ENT_ROW_SET:308 case XT_LOG_ENT_ROW_SET:
309 rec_type = "SET ROW";309 rec_type = "SET ROW";
310 break;310 break;
311 case XT_LOG_ENT_ROW_FREED:311 case XT_LOG_ENT_ROW_FREED:
312 rec_type = "FREE ROW";312 rec_type = "FREE ROW";
313 break;313 break;
314 case XT_LOG_ENT_OP_SYNC:314 case XT_LOG_ENT_OP_SYNC:
315 rec_type = "OP SYNC";315 rec_type = "OP SYNC";
316 break;316 break;
317 case XT_LOG_ENT_NO_OP:317 case XT_LOG_ENT_NO_OP:
318 rec_type = "NO OP";318 rec_type = "NO OP";
319 break;319 break;
320 case XT_LOG_ENT_END_OF_LOG:320 case XT_LOG_ENT_END_OF_LOG:
321 rec_type = "END OF LOG";321 rec_type = "END OF LOG";
322 break;322 break;
323 }323 }
324324
325 if (log)325 if (log)
326 PRINTF("log=%d offset=%d ", (int) log, (int) offset);326 PRINTF("log=%d offset=%d ", (int) log, (int) offset);
327 PRINTF("%s ", rec_type);327 PRINTF("%s ", rec_type);
328 if (type)328 if (type)
329 PRINTF("op=%lu tab=%lu %s=%lu ", (u_long) op_no, (u_long) tab_id, type, (u_long) rec_id);329 PRINTF("op=%lu tab=%lu %s=%lu ", (u_long) op_no, (u_long) tab_id, type, (u_long) rec_id);
330 if (row_id)330 if (row_id)
331 PRINTF("row=%lu ", (u_long) row_id);331 PRINTF("row=%lu ", (u_long) row_id);
332 if (log_id)332 if (log_id)
333 PRINTF("log=%lu offset=%lu ", (u_long) log_id, (u_long) log_offset);333 PRINTF("log=%lu offset=%lu ", (u_long) log_id, (u_long) log_offset);
334 if (xn_set)334 if (xn_set)
335 PRINTF("xact=%lu ", (u_long) xn_id);335 PRINTF("xact=%lu ", (u_long) xn_id);
336336
337#ifdef TRACE_RECORD_DATA337#ifdef TRACE_RECORD_DATA
338 if (rec_buf) {338 if (rec_buf) {
339 switch (rec_buf->tr_rec_type_1 & XT_TAB_STATUS_MASK) {339 switch (rec_buf->tr_rec_type_1 & XT_TAB_STATUS_MASK) {
340 case XT_TAB_STATUS_FREED:340 case XT_TAB_STATUS_FREED:
341 PRINTF("FREE");341 PRINTF("FREE");
342 break;342 break;
343 case XT_TAB_STATUS_DELETE:343 case XT_TAB_STATUS_DELETE:
344 PRINTF("DELE");344 PRINTF("DELE");
345 break;345 break;
346 case XT_TAB_STATUS_FIXED:346 case XT_TAB_STATUS_FIXED:
347 PRINTF("FIX-");347 PRINTF("FIX-");
348 break;348 break;
349 case XT_TAB_STATUS_VARIABLE:349 case XT_TAB_STATUS_VARIABLE:
350 PRINTF("VAR-");350 PRINTF("VAR-");
351 break;351 break;
352 case XT_TAB_STATUS_EXT_DLOG:352 case XT_TAB_STATUS_EXT_DLOG:
353 PRINTF("EXT-");353 PRINTF("EXT-");
354 break;354 break;
355 }355 }
356 if (rec_buf->tr_rec_type_1 & XT_TAB_STATUS_CLEANED_BIT)356 if (rec_buf->tr_rec_type_1 & XT_TAB_STATUS_CLEANED_BIT)
357 PRINTF("C");357 PRINTF("C");
358 else358 else
359 PRINTF(" ");359 PRINTF(" ");
360 }360 }
361 if (ext_rec) {361 if (ext_rec) {
362 rec_len -= offsetof(XTTabRecExtDRec, re_data);362 rec_len -= offsetof(XTTabRecExtDRec, re_data);
363 xt_print_bytes((xtWord1 *) ext_rec, offsetof(XTTabRecExtDRec, re_data));363 xt_print_bytes((xtWord1 *) ext_rec, offsetof(XTTabRecExtDRec, re_data));
364 PRINTF("| ");364 PRINTF("| ");
365 if (rec_len > 20)365 if (rec_len > 20)
366 rec_len = 20;366 rec_len = 20;
367 xt_print_bytes(ext_rec->re_data, rec_len);367 xt_print_bytes(ext_rec->re_data, rec_len);
368 }368 }
369 if (fix_rec) {369 if (fix_rec) {
370 rec_len -= offsetof(XTTabRecFixDRec, rf_data);370 rec_len -= offsetof(XTTabRecFixDRec, rf_data);
371 xt_print_bytes((xtWord1 *) fix_rec, offsetof(XTTabRecFixDRec, rf_data));371 xt_print_bytes((xtWord1 *) fix_rec, offsetof(XTTabRecFixDRec, rf_data));
372 PRINTF("| ");372 PRINTF("| ");
373 if (rec_len > 20)373 if (rec_len > 20)
374 rec_len = 20;374 rec_len = 20;
375 xt_print_bytes(fix_rec->rf_data, rec_len);375 xt_print_bytes(fix_rec->rf_data, rec_len);
376 }376 }
377#endif377#endif
378378
379 PRINTF("\n");379 PRINTF("\n");
380}380}
381381
382#ifdef DEBUG_PRINT382#ifdef DEBUG_PRINT
383void check_rows(void)383void check_rows(void)
384{384{
385 static XTOpenFilePtr of = NULL;385 static XTOpenFilePtr of = NULL;
386386
387 if (!of)387 if (!of)
388 of = xt_open_file_ns("./test/test_tab-1.xtr", XT_FS_DEFAULT);388 of = xt_open_file_ns("./test/test_tab-1.xtr", XT_FS_DEFAULT);
389 if (of) {389 if (of) {
390 size_t size = (size_t) xt_seek_eof_file(NULL, of);390 size_t size = (size_t) xt_seek_eof_file(NULL, of);
391 xtWord8 *buffer = (xtWord8 *) xt_malloc_ns(size);391 xtWord8 *buffer = (xtWord8 *) xt_malloc_ns(size);
392 xt_pread_file(of, 0, size, size, buffer, NULL);392 xt_pread_file(of, 0, size, size, buffer, NULL);
393 for (size_t i=0; i<size/8; i++) {393 for (size_t i=0; i<size/8; i++) {
394 if (!buffer[i])394 if (!buffer[i])
395 printf("%d is NULL\n", (int) i);395 printf("%d is NULL\n", (int) i);
396 }396 }
397 }397 }
398}398}
399399
400#endif400#endif
401401
402/* ----------------------------------------------------------------------402/* ----------------------------------------------------------------------
403 * APPLYING CHANGES IN SEQUENCE403 * APPLYING CHANGES IN SEQUENCE
404 */404 */
405405
406typedef struct XTOperation {406typedef struct XTOperation {
407 xtOpSeqNo or_op_seq;407 xtOpSeqNo or_op_seq;
408 xtWord4 or_op_len;408 xtWord4 or_op_len;
409 xtLogID or_log_id;409 xtLogID or_log_id;
410 xtLogOffset or_log_offset;410 xtLogOffset or_log_offset;
411} XTOperationRec, *XTOperationPtr;411} XTOperationRec, *XTOperationPtr;
412412
413static int xres_cmp_op_seq(struct XTThread *XT_UNUSED(self), register const void *XT_UNUSED(thunk), register const void *a, register const void *b)413static int xres_cmp_op_seq(struct XTThread *XT_UNUSED(self), register const void *XT_UNUSED(thunk), register const void *a, register const void *b)
414{414{
415 xtOpSeqNo lf_op_seq = *((xtOpSeqNo *) a);415 xtOpSeqNo lf_op_seq = *((xtOpSeqNo *) a);
416 XTOperationPtr lf_ptr = (XTOperationPtr) b;416 XTOperationPtr lf_ptr = (XTOperationPtr) b;
417417
418 if (lf_op_seq == lf_ptr->or_op_seq)418 if (lf_op_seq == lf_ptr->or_op_seq)
419 return 0;419 return 0;
420 if (XTTableSeq::xt_op_is_before(lf_op_seq, lf_ptr->or_op_seq))420 if (XTTableSeq::xt_op_is_before(lf_op_seq, lf_ptr->or_op_seq))
421 return -1;421 return -1;
422 return 1;422 return 1;
423}423}
424424
425xtPublic void xt_xres_init_tab(XTThreadPtr self, XTTableHPtr tab)425xtPublic void xt_xres_init_tab(XTThreadPtr self, XTTableHPtr tab)
426{426{
427 tab->tab_op_list = xt_new_sortedlist(self, sizeof(XTOperationRec), 20, 1000, xres_cmp_op_seq, NULL, NULL, TRUE, FALSE);427 tab->tab_op_list = xt_new_sortedlist(self, sizeof(XTOperationRec), 20, 1000, xres_cmp_op_seq, NULL, NULL, TRUE, FALSE);
428}428}
429429
430xtPublic void xt_xres_exit_tab(XTThreadPtr self, XTTableHPtr tab)430xtPublic void xt_xres_exit_tab(XTThreadPtr self, XTTableHPtr tab)
431{431{
432 if (tab->tab_op_list) {432 if (tab->tab_op_list) {
433 xt_free_sortedlist(self, tab->tab_op_list);433 xt_free_sortedlist(self, tab->tab_op_list);
434 tab->tab_op_list = NULL;434 tab->tab_op_list = NULL;
435 }435 }
436}436}
437437
438static xtBool xres_open_table(XTThreadPtr self, XTWriterStatePtr ws, xtTableID tab_id)438static xtBool xres_open_table(XTThreadPtr self, XTWriterStatePtr ws, xtTableID tab_id)
439{439{
440 XTOpenTablePtr ot;440 XTOpenTablePtr ot;
441441
442 if ((ot = ws->ws_ot)) {442 if ((ot = ws->ws_ot)) {
443 if (ot->ot_table->tab_id == tab_id)443 if (ot->ot_table->tab_id == tab_id)
444 return OK;444 return OK;
445 xt_db_return_table_to_pool(self, ot);445 xt_db_return_table_to_pool(self, ot);
446 ws->ws_ot = NULL;446 ws->ws_ot = NULL;
447 }447 }
448448
449 if (ws->ws_tab_gone == tab_id)449 if (ws->ws_tab_gone == tab_id)
450 return FAILED;450 return FAILED;
451 if ((ws->ws_ot = xt_db_open_pool_table(self, ws->ws_db, tab_id, NULL, TRUE))) {451 if ((ws->ws_ot = xt_db_open_pool_table(self, ws->ws_db, tab_id, NULL, TRUE))) {
452 XTTableHPtr tab;452 XTTableHPtr tab;
453453
454 tab = ws->ws_ot->ot_table;454 tab = ws->ws_ot->ot_table;
455 if (!tab->tab_ind_rec_log_id) {455 if (!tab->tab_ind_rec_log_id) {
456 /* Should not happen... */456 /* Should not happen... */
457 tab->tab_ind_rec_log_id = ws->ws_ind_rec_log_id;457 tab->tab_ind_rec_log_id = ws->ws_ind_rec_log_id;
458 tab->tab_ind_rec_log_offset = ws->ws_ind_rec_log_offset;458 tab->tab_ind_rec_log_offset = ws->ws_ind_rec_log_offset;
459 }459 }
460 return OK;460 return OK;
461 }461 }
462 ws->ws_tab_gone = tab_id;462 ws->ws_tab_gone = tab_id;
463 return FAILED;463 return FAILED;
464}464}
465465
466/* {INDEX-RECOV_ROWID}466/* {INDEX-RECOV_ROWID}
467 * Add missing index entries during recovery.467 * Add missing index entries during recovery.
468 * Set the row ID even if the index entry468 * Set the row ID even if the index entry
469 * is not committed. It will be removed later by469 * is not committed. It will be removed later by
470 * the sweeper.470 * the sweeper.
471 */471 */
472static xtBool xres_add_index_entries(XTOpenTablePtr ot, xtRowID row_id, xtRecordID rec_id, xtWord1 *rec_data)472static xtBool xres_add_index_entries(XTOpenTablePtr ot, xtRowID row_id, xtRecordID rec_id, xtWord1 *rec_data)
473{473{
474 XTTableHPtr tab = ot->ot_table;474 XTTableHPtr tab = ot->ot_table;
475 u_int idx_cnt;475 u_int idx_cnt;
476 XTIndexPtr *ind;476 XTIndexPtr *ind;
477 //XTIdxSearchKeyRec key;477 //XTIdxSearchKeyRec key;
478478
479 if (tab->tab_dic.dic_disable_index)479 if (tab->tab_dic.dic_disable_index)
480 return OK;480 return OK;
481481
482 for (idx_cnt=0, ind=tab->tab_dic.dic_keys; idx_cnt<tab->tab_dic.dic_key_count; idx_cnt++, ind++) {482 for (idx_cnt=0, ind=tab->tab_dic.dic_keys; idx_cnt<tab->tab_dic.dic_key_count; idx_cnt++, ind++) {
483 if (!xt_idx_insert(ot, *ind, row_id, rec_id, rec_data, NULL, TRUE)) {483 if (!xt_idx_insert(ot, *ind, row_id, rec_id, rec_data, NULL, TRUE)) {
484 /* Check the error, certain errors are recoverable! */484 /* Check the error, certain errors are recoverable! */
485 XTThreadPtr self = xt_get_self();485 XTThreadPtr self = xt_get_self();
486486
487 if (self->t_exception.e_xt_err == XT_SYSTEM_ERROR &&487 if (self->t_exception.e_xt_err == XT_SYSTEM_ERROR &&
488 (XT_FILE_IN_USE(self->t_exception.e_sys_err) ||488 (XT_FILE_IN_USE(self->t_exception.e_sys_err) ||
489 XT_FILE_ACCESS_DENIED(self->t_exception.e_sys_err) ||489 XT_FILE_ACCESS_DENIED(self->t_exception.e_sys_err) ||
490 XT_FILE_TOO_MANY_OPEN(self->t_exception.e_sys_err) ||490 XT_FILE_TOO_MANY_OPEN(self->t_exception.e_sys_err) ||
491 self->t_exception.e_sys_err == XT_ENOMEM)) {491 self->t_exception.e_sys_err == XT_ENOMEM)) {
492 ot->ot_err_index_no = (*ind)->mi_index_no;492 ot->ot_err_index_no = (*ind)->mi_index_no;
493 return FAILED;493 return FAILED;
494 }494 }
495495
496 /* TODO: Write something to the index header to indicate that496 /* TODO: Write something to the index header to indicate that
497 * it is corrupted.497 * it is corrupted.
498 */498 */
499 tab->tab_dic.dic_disable_index = XT_INDEX_CORRUPTED;499 tab->tab_dic.dic_disable_index = XT_INDEX_CORRUPTED;
500 xt_log_and_clear_exception_ns();500 xt_log_and_clear_exception_ns();
501 return OK;501 return OK;
502 }502 }
503 }503 }
504 return OK;504 return OK;
505}505}
506506
507static void xres_remove_index_entries(XTOpenTablePtr ot, xtRecordID rec_id, xtWord1 *rec_data)507static void xres_remove_index_entries(XTOpenTablePtr ot, xtRecordID rec_id, xtWord1 *rec_data)
508{508{
509 XTTableHPtr tab = ot->ot_table;509 XTTableHPtr tab = ot->ot_table;
510 u_int idx_cnt;510 u_int idx_cnt;
511 XTIndexPtr *ind;511 XTIndexPtr *ind;
512512
513 if (tab->tab_dic.dic_disable_index)513 if (tab->tab_dic.dic_disable_index)
514 return;514 return;
515515
516 for (idx_cnt=0, ind=tab->tab_dic.dic_keys; idx_cnt<tab->tab_dic.dic_key_count; idx_cnt++, ind++) {516 for (idx_cnt=0, ind=tab->tab_dic.dic_keys; idx_cnt<tab->tab_dic.dic_key_count; idx_cnt++, ind++) {
517 if (!xt_idx_delete(ot, *ind, rec_id, rec_data))517 if (!xt_idx_delete(ot, *ind, rec_id, rec_data))
518 xt_log_and_clear_exception_ns();518 xt_log_and_clear_exception_ns();
519 }519 }
520}520}
521521
522static xtWord1 *xres_load_record(XTThreadPtr self, XTOpenTablePtr ot, xtRecordID rec_id, xtWord1 *data, size_t red_size, XTInfoBufferPtr rec_buf, u_int cols_req)522static xtWord1 *xres_load_record(XTThreadPtr self, XTOpenTablePtr ot, xtRecordID rec_id, xtWord1 *data, size_t red_size, XTInfoBufferPtr rec_buf, u_int cols_req)
523{523{
524 XTTableHPtr tab = ot->ot_table;524 XTTableHPtr tab = ot->ot_table;
525 xtWord1 *rec_data;525 xtWord1 *rec_data;
526526
527 rec_data = ot->ot_row_rbuffer;527 rec_data = ot->ot_row_rbuffer;
528528
529 ASSERT(red_size <= ot->ot_row_rbuf_size);529 ASSERT(red_size <= ot->ot_row_rbuf_size);
530 ASSERT(tab->tab_dic.dic_rec_size <= ot->ot_row_rbuf_size);530 ASSERT(tab->tab_dic.dic_rec_size <= ot->ot_row_rbuf_size);
531 if (data) {531 if (data) {
532 if (rec_data != data)532 if (rec_data != data)
533 memcpy(rec_data, data, red_size);533 memcpy(rec_data, data, red_size);
534 }534 }
535 else {535 else {
536 /* It can be that less than 'dic_rec_size' was written for536 /* It can be that less than 'dic_rec_size' was written for
537 * variable length type records.537 * variable length type records.
538 * If this is the last record in the file, then we will read538 * If this is the last record in the file, then we will read
539 * less than actual record size.539 * less than actual record size.
540 */540 */
541 if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), tab->tab_dic.dic_rec_size, 0, rec_data, &red_size, &self->st_statistics.st_rec, self))541 if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), tab->tab_dic.dic_rec_size, 0, rec_data, &red_size, &self->st_statistics.st_rec, self))
542 goto failed;542 goto failed;
543 543
544 if (red_size < sizeof(XTTabRecHeadDRec))544 if (red_size < sizeof(XTTabRecHeadDRec))
545 return NULL;545 return NULL;
546 }546 }
547 547
548 if (XT_REC_IS_FIXED(rec_data[0]))548 if (XT_REC_IS_FIXED(rec_data[0]))
549 rec_data = ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE;549 rec_data = ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE;
550 else {550 else {
551 if (!xt_ib_alloc(NULL, rec_buf, tab->tab_dic.dic_mysql_buf_size))551 if (!xt_ib_alloc(NULL, rec_buf, tab->tab_dic.dic_mysql_buf_size))
552 goto failed;552 goto failed;
553 if (XT_REC_IS_VARIABLE(rec_data[0])) {553 if (XT_REC_IS_VARIABLE(rec_data[0])) {
554 if (!myxt_load_row(ot, rec_data + XT_REC_FIX_HEADER_SIZE, rec_buf->ib_db.db_data, cols_req))554 if (!myxt_load_row(ot, rec_data + XT_REC_FIX_HEADER_SIZE, rec_buf->ib_db.db_data, cols_req))
555 goto failed;555 goto failed;
556 }556 }
557 else if (XT_REC_IS_EXT_DLOG(rec_data[0])) {557 else if (XT_REC_IS_EXT_DLOG(rec_data[0])) {
558 if (red_size < XT_REC_EXT_HEADER_SIZE)558 if (red_size < XT_REC_EXT_HEADER_SIZE)
559 return NULL;559 return NULL;
560560
561 ASSERT(cols_req);561 ASSERT(cols_req);
562 if (cols_req && cols_req <= tab->tab_dic.dic_fix_col_count) {562 if (cols_req && cols_req <= tab->tab_dic.dic_fix_col_count) {
563 if (!myxt_load_row(ot, rec_data + XT_REC_EXT_HEADER_SIZE, rec_buf->ib_db.db_data, cols_req))563 if (!myxt_load_row(ot, rec_data + XT_REC_EXT_HEADER_SIZE, rec_buf->ib_db.db_data, cols_req))
564 goto failed;564 goto failed;
565 }565 }
566 else {566 else {
567 if (!xt_tab_load_ext_data(ot, rec_id, rec_buf->ib_db.db_data, cols_req))567 if (!xt_tab_load_ext_data(ot, rec_id, rec_buf->ib_db.db_data, cols_req))
568 goto failed;568 goto failed;
569 }569 }
570 }570 }
571 else571 else
572 /* This is possible, the record has already been cleaned up. */572 /* This is possible, the record has already been cleaned up. */
573 return NULL;573 return NULL;
574 rec_data = rec_buf->ib_db.db_data;574 rec_data = rec_buf->ib_db.db_data;
575 }575 }
576576
577 return rec_data;577 return rec_data;
578578
579 failed:579 failed:
580 /* Running out of memory should not be ignored. */580 /* Running out of memory should not be ignored. */
581 if (self->t_exception.e_xt_err == XT_SYSTEM_ERROR &&581 if (self->t_exception.e_xt_err == XT_SYSTEM_ERROR &&
582 self->t_exception.e_sys_err == XT_ENOMEM)582 self->t_exception.e_sys_err == XT_ENOMEM)
583 xt_throw(self);583 xt_throw(self);
584 xt_log_and_clear_exception_ns();584 xt_log_and_clear_exception_ns();
585 return NULL;585 return NULL;
586}586}
587587
588/*588/*
589 * Apply a change from the log.589 * Apply a change from the log.
590 *590 *
591 * This function is basically very straight forward, were it not591 * This function is basically very straight forward, were it not
592 * for the option to apply operations out of sequence.592 * for the option to apply operations out of sequence.
593 * (i.e. in_sequence == FALSE)593 * (i.e. in_sequence == FALSE)
594 *594 *
595 * If operations are applied in sequence, then they can be595 * If operations are applied in sequence, then they can be
596 * applied blindly. The update operation is just executed as596 * applied blindly. The update operation is just executed as
597 * it was logged.597 * it was logged.
598 *598 *
599 * If the changes are not in sequence, then some operation are missing,599 * If the changes are not in sequence, then some operation are missing,
600 * however, the operations that are present are in the correct order.600 * however, the operations that are present are in the correct order.
601 *601 *
602 * This can only happen at the end of recovery!!!602 * This can only happen at the end of recovery!!!
603 * After we have applied all operations in the log we may be603 * After we have applied all operations in the log we may be
604 * left with some operations that have not been applied604 * left with some operations that have not been applied
605 * because operations were logged out of sequence.605 * because operations were logged out of sequence.
606 *606 *
607 * The application of these operations there has to take into607 * The application of these operations there has to take into
608 * account the current state of the database.608 * account the current state of the database.
609 * They are then applied in a manner that maintains the609 * They are then applied in a manner that maintains the
610 * database consistency.610 * database consistency.
611 *611 *
612 * For example, a record that is freed, is free by placing it612 * For example, a record that is freed, is free by placing it
613 * on the current free list. Part of the data logged for the613 * on the current free list. Part of the data logged for the
614 * operation is ignored. Namely: the "next block" pointer614 * operation is ignored. Namely: the "next block" pointer
615 * that was originally written into the freed record.615 * that was originally written into the freed record.
616 */616 */
617static void xres_apply_change(XTThreadPtr self, XTOpenTablePtr ot, XTXactLogBufferDPtr record, xtBool in_sequence, xtBool check_index, XTInfoBufferPtr rec_buf)617static void xres_apply_change(XTThreadPtr self, XTOpenTablePtr ot, XTXactLogBufferDPtr record, xtBool in_sequence, xtBool check_index, XTInfoBufferPtr rec_buf)
618{618{
619 XTTableHPtr tab = ot->ot_table;619 XTTableHPtr tab = ot->ot_table;
620 size_t len;620 size_t len;
621 xtRecordID rec_id;621 xtRecordID rec_id;
622 xtRefID free_ref_id;622 xtRefID free_ref_id;
623 XTTabRecFreeDRec free_rec;623 XTTabRecFreeDRec free_rec;
624 xtRowID row_id;624 xtRowID row_id;
625 XTTabRowRefDRec row_buf;625 XTTabRowRefDRec row_buf;
626 XTTabRecHeadDRec rec_head;626 XTTabRecHeadDRec rec_head;
627 size_t tfer;627 size_t tfer;
628 xtRecordID link_rec_id, prev_link_rec_id;628 xtRecordID link_rec_id, prev_link_rec_id;
629 xtWord1 *rec_data = NULL;629 xtWord1 *rec_data = NULL;
630 XTTabRecFreeDPtr free_data;630 XTTabRecFreeDPtr free_data;
631631
632 switch (record->xl.xl_status_1) {632 switch (record->xl.xl_status_1) {
633 case XT_LOG_ENT_REC_MODIFIED:633 case XT_LOG_ENT_REC_MODIFIED:
634 case XT_LOG_ENT_UPDATE:634 case XT_LOG_ENT_UPDATE:
635 case XT_LOG_ENT_INSERT:635 case XT_LOG_ENT_INSERT:
636 case XT_LOG_ENT_DELETE:636 case XT_LOG_ENT_DELETE:
637 case XT_LOG_ENT_UPDATE_BG:637 case XT_LOG_ENT_UPDATE_BG:
638 case XT_LOG_ENT_INSERT_BG:638 case XT_LOG_ENT_INSERT_BG:
639 case XT_LOG_ENT_DELETE_BG:639 case XT_LOG_ENT_DELETE_BG:
640 rec_id = XT_GET_DISK_4(record->xu.xu_rec_id_4);640 rec_id = XT_GET_DISK_4(record->xu.xu_rec_id_4);
641 len = (size_t) XT_GET_DISK_2(record->xu.xu_size_2);641
642 if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), len, (xtWord1 *) &record->xu.xu_rec_type_1, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))642 /* This should be done before we apply change to table, as otherwise we lose
643 xt_throw(self);643 * the key value that we need to remove from index
644 tab->tab_bytes_to_flush += len;644 */
645645 if (check_index && ot->ot_table->tab_dic.dic_key_count && record->xl.xl_status_1 == XT_LOG_ENT_REC_MODIFIED) {
646 if (check_index && ot->ot_table->tab_dic.dic_key_count) {646 if ((rec_data = xres_load_record(self, ot, rec_id, NULL, 0, rec_buf, tab->tab_dic.dic_ind_cols_req)))
647 switch (record->xl.xl_status_1) {647 xres_remove_index_entries(ot, rec_id, rec_data);
648 case XT_LOG_ENT_DELETE:648 }
649 case XT_LOG_ENT_DELETE_BG:649
650 break;650 len = (size_t) XT_GET_DISK_2(record->xu.xu_size_2);
651 case XT_LOG_ENT_REC_MODIFIED:651 if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), len, (xtWord1 *) &record->xu.xu_rec_type_1, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
652 if ((rec_data = xres_load_record(self, ot, rec_id, NULL, 0, rec_buf, tab->tab_dic.dic_ind_cols_req)))652 xt_throw(self);
653 xres_remove_index_entries(ot, rec_id, rec_data);653 tab->tab_bytes_to_flush += len;
654 /* No break required: */654
655 default:655 if (check_index && ot->ot_table->tab_dic.dic_key_count) {
656 if ((rec_data = xres_load_record(self, ot, rec_id, &record->xu.xu_rec_type_1, len, rec_buf, tab->tab_dic.dic_ind_cols_req))) {656 switch (record->xl.xl_status_1) {
657 row_id = XT_GET_DISK_4(record->xu.xu_row_id_4);657 case XT_LOG_ENT_DELETE:
658 if (!xres_add_index_entries(ot, row_id, rec_id, rec_data))658 case XT_LOG_ENT_DELETE_BG:
659 xt_throw(self);659 break;
660 }660 default:
661 break;661 if ((rec_data = xres_load_record(self, ot, rec_id, &record->xu.xu_rec_type_1, len, rec_buf, tab->tab_dic.dic_ind_cols_req))) {
662 }662 row_id = XT_GET_DISK_4(record->xu.xu_row_id_4);
663 }663 if (!xres_add_index_entries(ot, row_id, rec_id, rec_data))
664664 xt_throw(self);
665 if (!in_sequence) {665 }
666 /* A record has been allocated from the EOF, but out of sequence.666 break;
667 * This could leave a gap where other records were allocated667 }
668 * from the EOF, but those operations have been lost!668 }
669 * We compensate for this by adding all blocks between669
670 * to the free list.670 if (!in_sequence) {
671 */671 /* A record has been allocated from the EOF, but out of sequence.
672 free_rec.rf_rec_type_1 = XT_TAB_STATUS_FREED;672 * This could leave a gap where other records were allocated
673 free_rec.rf_not_used_1 = 0;673 * from the EOF, but those operations have been lost!
674 while (tab->tab_head_rec_eof_id < rec_id) {674 * We compensate for this by adding all blocks between
675 XT_SET_DISK_4(free_rec.rf_next_rec_id_4, tab->tab_head_rec_free_id);675 * to the free list.
676 if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, tab->tab_head_rec_eof_id, sizeof(XTTabRecFreeDRec), (xtWord1 *) &free_rec, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))676 */
677 xt_throw(self);677 free_rec.rf_rec_type_1 = XT_TAB_STATUS_FREED;
678 tab->tab_bytes_to_flush += sizeof(XTTabRecFreeDRec);678 free_rec.rf_not_used_1 = 0;
679 tab->tab_head_rec_free_id = tab->tab_head_rec_eof_id;679 while (tab->tab_head_rec_eof_id < rec_id) {
680 tab->tab_head_rec_eof_id++;680 XT_SET_DISK_4(free_rec.rf_next_rec_id_4, tab->tab_head_rec_free_id);
681 }681 if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, tab->tab_head_rec_eof_id, sizeof(XTTabRecFreeDRec), (xtWord1 *) &free_rec, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
682 }682 xt_throw(self);
683 if (tab->tab_head_rec_eof_id < rec_id + 1)683 tab->tab_bytes_to_flush += sizeof(XTTabRecFreeDRec);
684 tab->tab_head_rec_eof_id = rec_id + 1;684 tab->tab_head_rec_free_id = tab->tab_head_rec_eof_id;
685 tab->tab_flush_pending = TRUE;685 tab->tab_head_rec_eof_id++;
686 break;686 }
687 case XT_LOG_ENT_UPDATE_FL:687 }
688 case XT_LOG_ENT_INSERT_FL:688 if (tab->tab_head_rec_eof_id < rec_id + 1)
689 case XT_LOG_ENT_DELETE_FL:689 tab->tab_head_rec_eof_id = rec_id + 1;
690 case XT_LOG_ENT_UPDATE_FL_BG:690 tab->tab_flush_pending = TRUE;
691 case XT_LOG_ENT_INSERT_FL_BG:691 break;
692 case XT_LOG_ENT_DELETE_FL_BG:692 case XT_LOG_ENT_UPDATE_FL:
693 rec_id = XT_GET_DISK_4(record->xf.xf_rec_id_4);693 case XT_LOG_ENT_INSERT_FL:
694 len = (size_t) XT_GET_DISK_2(record->xf.xf_size_2);694 case XT_LOG_ENT_DELETE_FL:
695 free_ref_id = XT_GET_DISK_4(record->xf.xf_free_rec_id_4);695 case XT_LOG_ENT_UPDATE_FL_BG:
696696 case XT_LOG_ENT_INSERT_FL_BG:
697 if (check_index &&697 case XT_LOG_ENT_DELETE_FL_BG:
698 record->xf.xf_status_1 != XT_LOG_ENT_DELETE_FL &&698 rec_id = XT_GET_DISK_4(record->xf.xf_rec_id_4);
699 record->xf.xf_status_1 != XT_LOG_ENT_DELETE_FL_BG) {699 len = (size_t) XT_GET_DISK_2(record->xf.xf_size_2);
700 if ((rec_data = xres_load_record(self, ot, rec_id, &record->xf.xf_rec_type_1, len, rec_buf, tab->tab_dic.dic_ind_cols_req))) {700 free_ref_id = XT_GET_DISK_4(record->xf.xf_free_rec_id_4);
701 row_id = XT_GET_DISK_4(record->xf.xf_row_id_4);701
702 if (!xres_add_index_entries(ot, row_id, rec_id, rec_data))702 if (check_index &&
703 xt_throw(self);703 record->xf.xf_status_1 != XT_LOG_ENT_DELETE_FL &&
704 }704 record->xf.xf_status_1 != XT_LOG_ENT_DELETE_FL_BG) {
705 }705 if ((rec_data = xres_load_record(self, ot, rec_id, &record->xf.xf_rec_type_1, len, rec_buf, tab->tab_dic.dic_ind_cols_req))) {
706706 row_id = XT_GET_DISK_4(record->xf.xf_row_id_4);
707 if (!in_sequence) {707 if (!xres_add_index_entries(ot, row_id, rec_id, rec_data))
708 /* This record was allocated from the free list.708 xt_throw(self);
709 * Because this operation is out of sequence, there709 }
710 * could have been other allocations from the710 }
711 * free list before this, that have gone missing.711
712 * For this reason we have to search the current712 if (!in_sequence) {
713 * free list and remove the record.713 /* This record was allocated from the free list.
714 */714 * Because this operation is out of sequence, there
715 link_rec_id = tab->tab_head_rec_free_id;715 * could have been other allocations from the
716 prev_link_rec_id = 0;716 * free list before this, that have gone missing.
717 while (link_rec_id) {717 * For this reason we have to search the current
718 if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, link_rec_id), sizeof(XTTabRecFreeDRec), sizeof(XTTabRecFreeDRec), (xtWord1 *) &free_rec, NULL, &self->st_statistics.st_rec, self))718 * free list and remove the record.
719 xt_throw(self);719 */
720 if (link_rec_id == rec_id)720 link_rec_id = tab->tab_head_rec_free_id;
721 break;721 prev_link_rec_id = 0;
722 prev_link_rec_id = link_rec_id;722 while (link_rec_id) {
723 link_rec_id = XT_GET_DISK_4(free_rec.rf_next_rec_id_4);723 if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, link_rec_id), sizeof(XTTabRecFreeDRec), sizeof(XTTabRecFreeDRec), (xtWord1 *) &free_rec, NULL, &self->st_statistics.st_rec, self))
724 }724 xt_throw(self);
725 if (link_rec_id == rec_id) {725 if (link_rec_id == rec_id)
726 /* The block was found on the free list.726 break;
727 * remove it: */727 prev_link_rec_id = link_rec_id;
728 if (prev_link_rec_id) {728 link_rec_id = XT_GET_DISK_4(free_rec.rf_next_rec_id_4);
729 /* We write the record from position 'link_rec_id' into729 }
730 * position 'prev_link_rec_id'. This unlinks 'link_rec_id'!730 if (link_rec_id == rec_id) {
731 */731 /* The block was found on the free list.
732 if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, prev_link_rec_id), sizeof(XTTabRecFreeDRec), (xtWord1 *) &free_rec, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))732 * remove it: */
733 xt_throw(self);733 if (prev_link_rec_id) {
734 tab->tab_bytes_to_flush += sizeof(XTTabRecFreeDRec);734 /* We write the record from position 'link_rec_id' into
735 free_ref_id = tab->tab_head_rec_free_id;735 * position 'prev_link_rec_id'. This unlinks 'link_rec_id'!
736 }736 */
737 else737 if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, prev_link_rec_id), sizeof(XTTabRecFreeDRec), (xtWord1 *) &free_rec, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
738 /* The block is at the front of the list: */738 xt_throw(self);
739 free_ref_id = XT_GET_DISK_4(free_rec.rf_next_rec_id_4);739 tab->tab_bytes_to_flush += sizeof(XTTabRecFreeDRec);
740 }740 free_ref_id = tab->tab_head_rec_free_id;
741 else {741 }
742 /* Not found on the free list? */742 else
743 if (tab->tab_head_rec_eof_id < rec_id + 1)743 /* The block is at the front of the list: */
744 tab->tab_head_rec_eof_id = rec_id + 1;744 free_ref_id = XT_GET_DISK_4(free_rec.rf_next_rec_id_4);
745 goto write_mod_data;745 }
746 }746 else {
747 }747 /* Not found on the free list? */
748 if (tab->tab_head_rec_eof_id < rec_id + 1)748 if (tab->tab_head_rec_eof_id < rec_id + 1)
749 tab->tab_head_rec_eof_id = rec_id + 1;749 tab->tab_head_rec_eof_id = rec_id + 1;
750 tab->tab_head_rec_free_id = free_ref_id;750 goto write_mod_data;
751 tab->tab_head_rec_fnum--;751 }
752 write_mod_data:752 }
753 if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), len, (xtWord1 *) &record->xf.xf_rec_type_1, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))753 if (tab->tab_head_rec_eof_id < rec_id + 1)
754 xt_throw(self);754 tab->tab_head_rec_eof_id = rec_id + 1;
755 tab->tab_bytes_to_flush += len;755 tab->tab_head_rec_free_id = free_ref_id;
756 tab->tab_flush_pending = TRUE;756 tab->tab_head_rec_fnum--;
757 break;757 write_mod_data:
758 case XT_LOG_ENT_REC_REMOVED:758 if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), len, (xtWord1 *) &record->xf.xf_rec_type_1, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
759 case XT_LOG_ENT_REC_REMOVED_EXT: {759 xt_throw(self);
760 xtBool record_loaded;760 tab->tab_bytes_to_flush += len;
761 XTTabRecExtDPtr ext_rec;761 tab->tab_flush_pending = TRUE;
762 size_t red_size;762 break;
763 xtWord4 log_over_size = 0;763 case XT_LOG_ENT_REC_REMOVED:
764 xtLogID data_log_id = 0;764 case XT_LOG_ENT_REC_REMOVED_EXT: {
765 xtLogOffset data_log_offset = 0;765 xtBool record_loaded;
766 u_int cols_required = 0;766 XTTabRecExtDPtr ext_rec;
767767 size_t red_size;
768 rec_id = XT_GET_DISK_4(record->fr.fr_rec_id_4);768 xtWord4 log_over_size = 0;
769 free_data = (XTTabRecFreeDPtr) &record->fr.fr_rec_type_1;769 xtLogID data_log_id = 0;
770770 xtLogOffset data_log_offset = 0;
771 /* This is a short-cut, it does not require loading the record: */771 u_int cols_required = 0;
772 if (!check_index && !tab->tab_dic.dic_blob_count && record->fr.fr_status_1 != XT_LOG_ENT_REC_REMOVED_EXT)772
773 goto do_rec_freed;773 rec_id = XT_GET_DISK_4(record->fr.fr_rec_id_4);
774774 free_data = (XTTabRecFreeDPtr) &record->fr.fr_rec_type_1;
775 ext_rec = (XTTabRecExtDPtr) ot->ot_row_rbuffer;775
776776 /* This is a short-cut, it does not require loading the record: */
777 if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), tab->tab_dic.dic_rec_size, 0, ext_rec, &red_size, &self->st_statistics.st_rec, self)) {777 if (!check_index && !tab->tab_dic.dic_blob_count && record->fr.fr_status_1 != XT_LOG_ENT_REC_REMOVED_EXT)
778 xt_log_and_clear_exception_ns();778 goto do_rec_freed;
779 goto do_rec_freed;779
780 }780 ext_rec = (XTTabRecExtDPtr) ot->ot_row_rbuffer;
781781
782 if (red_size < sizeof(XTTabRecHeadDRec))782 if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), tab->tab_dic.dic_rec_size, 0, ext_rec, &red_size, &self->st_statistics.st_rec, self)) {
783 goto do_rec_freed;783 xt_log_and_clear_exception_ns();
784784 goto do_rec_freed;
785 /* Check that the record is the same as the one originally removed.785 }
786 * This can be different if recovery is repeated.786
787 * For example:787 if (red_size < sizeof(XTTabRecHeadDRec))
788 * 788 goto do_rec_freed;
789 * log=21 offset=6304472 REMOVED-X REC op=360616 tab=7 rec=25874 789
790 * log=21 offset=6309230 UPDATE-FL op=360618 tab=7 rec=25874 row=26667 log=1 offset=26503077 xact=209 790 /* Check that the record is the same as the one originally removed.
791 * log=21 offset=6317500 CLEAN REC op=360631 tab=7 rec=25874 791 * This can be different if recovery is repeated.
792 * 792 * For example:
793 * If this recovery sequence is repeated, then the REMOVED-X will free the793 *
794 * extended record belonging to the update that came afterwards!794 * log=21 offset=6304472 REMOVED-X REC op=360616 tab=7 rec=25874
795 *795 * log=21 offset=6309230 UPDATE-FL op=360618 tab=7 rec=25874 row=26667 log=1 offset=26503077 xact=209
796 * Additional situation to consider:796 * log=21 offset=6317500 CLEAN REC op=360631 tab=7 rec=25874
797 *797 *
798 * - A record "x" is created, and index entries created.798 * If this recovery sequence is repeated, then the REMOVED-X will free the
799 * - A checkpoint is made done.799 * extended record belonging to the update that came afterwards!
800 * - Record "x" is deleted due to UPDATE.800 *
801 * - The index entries are removed, but the index is not801 * Additional situation to consider:
802 * flushed.802 *
803 * - This deletion is written to disk by the writer.803 * - A record "x" is created, and index entries created.
804 * So we have the situation that the remove is on disk,804 * - A checkpoint is made done.
805 * but the index changes have not been made.805 * - Record "x" is deleted due to UPDATE.
806 *806 * - The index entries are removed, but the index is not
807 * In this case, skipping to "do_rec_freed" is incorrect.807 * flushed.
808 */808 * - This deletion is written to disk by the writer.
809 if (record->fr.fr_stat_id_1 != ext_rec->tr_stat_id_1 ||809 * So we have the situation that the remove is on disk,
810 XT_GET_DISK_4(record->fr.fr_xact_id_4) != XT_GET_DISK_4(ext_rec->tr_xact_id_4))810 * but the index changes have not been made.
811 goto dont_remove_x_record;811 *
812812 * In this case, skipping to "do_rec_freed" is incorrect.
813 if (record->xl.xl_status_1 == XT_LOG_ENT_REC_REMOVED_EXT) {813 */
814 if (!XT_REC_IS_EXT_DLOG(ext_rec->tr_rec_type_1))814 if (record->fr.fr_stat_id_1 != ext_rec->tr_stat_id_1 ||
815 goto dont_remove_x_record;815 XT_GET_DISK_4(record->fr.fr_xact_id_4) != XT_GET_DISK_4(ext_rec->tr_xact_id_4))
816 if (red_size < offsetof(XTTabRecExtDRec, re_data))816 goto dont_remove_x_record;
817 goto dont_remove_x_record;817
818818 if (record->xl.xl_status_1 == XT_LOG_ENT_REC_REMOVED_EXT) {
819 /* Save this for later (can be overwritten by xres_load_record(): */819 if (!XT_REC_IS_EXT_DLOG(ext_rec->tr_rec_type_1))
820 data_log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);820 goto dont_remove_x_record;
821 data_log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);821 if (red_size < offsetof(XTTabRecExtDRec, re_data))
822 log_over_size = XT_GET_DISK_4(ext_rec->re_log_dat_siz_4);822 goto dont_remove_x_record;
823 }823
824 dont_remove_x_record:824 /* Save this for later (can be overwritten by xres_load_record(): */
825825 data_log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
826 record_loaded = FALSE;826 data_log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
827827 log_over_size = XT_GET_DISK_4(ext_rec->re_log_dat_siz_4);
828 if (check_index) {828 }
829 cols_required = tab->tab_dic.dic_ind_cols_req;829 dont_remove_x_record:
830 if (tab->tab_dic.dic_blob_cols_req > cols_required)830
831 cols_required = tab->tab_dic.dic_blob_cols_req;831 record_loaded = FALSE;
832 if (!(rec_data = xres_load_record(self, ot, rec_id, ot->ot_row_rbuffer, red_size, rec_buf, cols_required)))832
833 goto do_rec_freed;833 if (check_index) {
834 record_loaded = TRUE;834 cols_required = tab->tab_dic.dic_ind_cols_req;
835 xres_remove_index_entries(ot, rec_id, rec_data);835 if (tab->tab_dic.dic_blob_cols_req > cols_required)
836 }836 cols_required = tab->tab_dic.dic_blob_cols_req;
837837 if (!(rec_data = xres_load_record(self, ot, rec_id, ot->ot_row_rbuffer, red_size, rec_buf, cols_required)))
838 if (tab->tab_dic.dic_blob_count) {838 goto do_rec_freed;
839 if (!record_loaded) {839 record_loaded = TRUE;
840 if (tab->tab_dic.dic_blob_cols_req > cols_required)840 xres_remove_index_entries(ot, rec_id, rec_data);
841 cols_required = tab->tab_dic.dic_blob_cols_req;841 }
842 if (!(rec_data = xres_load_record(self, ot, rec_id, ot->ot_row_rbuffer, red_size, rec_buf, cols_required)))842
843 /* [(7)] REMOVE is followed by FREE:843 if (tab->tab_dic.dic_blob_count) {
844 goto get_rec_offset;844 if (!record_loaded) {
845 */845 if (tab->tab_dic.dic_blob_cols_req > cols_required)
846 goto do_rec_freed;846 cols_required = tab->tab_dic.dic_blob_cols_req;
847 record_loaded = TRUE;847 if (!(rec_data = xres_load_record(self, ot, rec_id, ot->ot_row_rbuffer, red_size, rec_buf, cols_required)))
848 }848 /* [(7)] REMOVE is followed by FREE:
849#ifdef XT_STREAMING849 goto get_rec_offset;
850 myxt_release_blobs(ot, rec_data, rec_id);850 */
851#endif851 goto do_rec_freed;
852 }852 record_loaded = TRUE;
853853 }
854 if (record->xl.xl_status_1 == XT_LOG_ENT_REC_REMOVED_EXT) {854#ifdef XT_STREAMING
855 /* Note: dlb_delete_log() may be repeated, but should handle this:855 myxt_release_blobs(ot, rec_data, rec_id);
856 * 856#endif
857 * Example:857 }
858 * log=5 offset=213334 CLEAN REC op=28175 tab=1 rec=317428 858
859 * ...859 if (record->xl.xl_status_1 == XT_LOG_ENT_REC_REMOVED_EXT) {
860 * log=6 offset=321063 REMOVED-X REC op=33878 tab=1 rec=317428 860 /* Note: dlb_delete_log() may be repeated, but should handle this:
861 *861 *
862 * When this sequence is repeated during recovery, then CLEAN REC862 * Example:
863 * will reset the status byte of the record so that it863 * log=5 offset=213334 CLEAN REC op=28175 tab=1 rec=317428
864 * comes back to here!864 * ...
865 *865 * log=6 offset=321063 REMOVED-X REC op=33878 tab=1 rec=317428
866 * The check for zero is probably not required here.866 *
867 */867 * When this sequence is repeated during recovery, then CLEAN REC
868 if (data_log_id && data_log_offset && log_over_size) {868 * will reset the status byte of the record so that it
869 if (!ot->ot_thread->st_dlog_buf.dlb_delete_log(data_log_id, data_log_offset, log_over_size, tab->tab_id, rec_id, self)) {869 * comes back to here!
870 if (ot->ot_thread->t_exception.e_xt_err != XT_ERR_BAD_EXT_RECORD &&870 *
871 ot->ot_thread->t_exception.e_xt_err != XT_ERR_DATA_LOG_NOT_FOUND)871 * The check for zero is probably not required here.
872 xt_log_and_clear_exception_ns();872 */
873 }873 if (data_log_id && data_log_offset && log_over_size) {
874 }874 if (!ot->ot_thread->st_dlog_buf.dlb_delete_log(data_log_id, data_log_offset, log_over_size, tab->tab_id, rec_id, self)) {
875 }875 if (ot->ot_thread->t_exception.e_xt_err != XT_ERR_BAD_EXT_RECORD &&
876876 ot->ot_thread->t_exception.e_xt_err != XT_ERR_DATA_LOG_NOT_FOUND)
877 goto do_rec_freed;877 xt_log_and_clear_exception_ns();
878 }878 }
879 case XT_LOG_ENT_REC_REMOVED_BI: {879 }
880 /*880 }
881 * For deletion we need the complete before image because of the following problem.881
882 *882 goto do_rec_freed;
883 * DROP TABLE IF EXISTS t1;883 }
884 * CREATE TABLE t1 (ID int primary key auto_increment, value int, index (value)) engine=pbxt;884 case XT_LOG_ENT_REC_REMOVED_BI: {
885 * 885 /*
886 * insert t1(value) values(50);886 * For deletion we need the complete before image because of the following problem.
887 * 887 *
888 * -- CHECKPOINT --888 * DROP TABLE IF EXISTS t1;
889 * 889 * CREATE TABLE t1 (ID int primary key auto_increment, value int, index (value)) engine=pbxt;
890 * update t1 set value = 60;890 *
891 * 891 * insert t1(value) values(50);
892 * -- PAUSE --892 *
893 * 893 * -- CHECKPOINT --
894 * update t1 set value = 70;894 *
895 * 895 * update t1 set value = 60;
896 * -- CRASH --896 *
897 * 897 * -- PAUSE --
898 * select value from t1;898 *
899 * select * from t1;899 * update t1 set value = 70;
900 * 900 *
901 * 081203 12:11:46 [Note] PBXT: Recovering from 1-148, bytes to read: 33554284901 * -- CRASH --
902 * log=1 offset=148 UPDATE-BG op=5 tab=1 rec=2 row=1 xact=3 902 *
903 * log=1 offset=188 REC ADD ROW op=6 tab=1 row=1 903 * select value from t1;
904 * log=1 offset=206 COMMIT xact=3 904 * select * from t1;
905 * log=1 offset=216 REMOVED REC op=7 tab=1 rec=1 xact=2 905 *
906 * log=1 offset=241 CLEAN REC op=8 tab=1 rec=2 906 * 081203 12:11:46 [Note] PBXT: Recovering from 1-148, bytes to read: 33554284
907 * log=1 offset=261 CLEANUP xact=3 907 * log=1 offset=148 UPDATE-BG op=5 tab=1 rec=2 row=1 xact=3
908 * log=1 offset=267 UPDATE-FL-BG op=9 tab=1 rec=1 row=1 xact=4 908 * log=1 offset=188 REC ADD ROW op=6 tab=1 row=1
909 * log=1 offset=311 REC ADD ROW op=10 tab=1 row=1 909 * log=1 offset=206 COMMIT xact=3
910 * log=1 offset=329 COMMIT xact=4 910 * log=1 offset=216 REMOVED REC op=7 tab=1 rec=1 xact=2
911 * log=1 offset=339 REMOVED REC op=11 tab=1 rec=2 xact=3 911 * log=1 offset=241 CLEAN REC op=8 tab=1 rec=2
912 * log=1 offset=364 CLEAN REC op=12 tab=1 rec=1 912 * log=1 offset=261 CLEANUP xact=3
913 * log=1 offset=384 CLEANUP xact=4 913 * log=1 offset=267 UPDATE-FL-BG op=9 tab=1 rec=1 row=1 xact=4
914 * 081203 12:12:15 [Note] PBXT: Recovering complete at 1-390, bytes read: 33554284914 * log=1 offset=311 REC ADD ROW op=10 tab=1 row=1
915 * 915 * log=1 offset=329 COMMIT xact=4
916 * mysql> select value from t1;916 * log=1 offset=339 REMOVED REC op=11 tab=1 rec=2 xact=3
917 * +-------+917 * log=1 offset=364 CLEAN REC op=12 tab=1 rec=1
918 * | value |918 * log=1 offset=384 CLEANUP xact=4
919 * +-------+919 * 081203 12:12:15 [Note] PBXT: Recovering complete at 1-390, bytes read: 33554284
920 * | 50 | 920 *
921 * | 70 | 921 * mysql> select value from t1;
922 * +-------+922 * +-------+
923 * 2 rows in set (55.99 sec)923 * | value |
924 * 924 * +-------+
925 * mysql> select * from t1;925 * | 50 |
926 * +----+-------+926 * | 70 |
927 * | ID | value |927 * +-------+
928 * +----+-------+928 * 2 rows in set (55.99 sec)
929 * | 1 | 70 | 929 *
930 * +----+-------+930 * mysql> select * from t1;
931 * 1 row in set (0.00 sec)931 * +----+-------+
932 */932 * | ID | value |
933 XTTabRecExtDPtr ext_rec;933 * +----+-------+
934 xtWord4 log_over_size = 0;934 * | 1 | 70 |
935 xtLogID data_log_id = 0;935 * +----+-------+
936 xtLogOffset data_log_offset = 0;936 * 1 row in set (0.00 sec)
937 u_int cols_required = 0;937 */
938 xtBool record_loaded;938 XTTabRecExtDPtr ext_rec;
939 size_t rec_size; 939 xtWord4 log_over_size = 0;
940940 xtLogID data_log_id = 0;
941 rec_id = XT_GET_DISK_4(record->rb.rb_rec_id_4);941 xtLogOffset data_log_offset = 0;
942 rec_size = XT_GET_DISK_2(record->rb.rb_size_2);942 u_int cols_required = 0;
943943 xtBool record_loaded;
944 ext_rec = (XTTabRecExtDPtr) &record->rb.rb_rec_type_1;944 size_t rec_size;
945945
946 if (XT_REC_IS_EXT_DLOG(record->rb.rb_rec_type_1)) {946 rec_id = XT_GET_DISK_4(record->rb.rb_rec_id_4);
947 /* Save this for later (can be overwritten by xres_load_record(): */947 rec_size = XT_GET_DISK_2(record->rb.rb_size_2);
948 data_log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);948
949 data_log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);949 ext_rec = (XTTabRecExtDPtr) &record->rb.rb_rec_type_1;
950 log_over_size = XT_GET_DISK_4(ext_rec->re_log_dat_siz_4);950
951 }951 if (XT_REC_IS_EXT_DLOG(record->rb.rb_rec_type_1)) {
952952 /* Save this for later (can be overwritten by xres_load_record(): */
953 record_loaded = FALSE;953 data_log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
954954 data_log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
955 if (check_index) {955 log_over_size = XT_GET_DISK_4(ext_rec->re_log_dat_siz_4);
956 cols_required = tab->tab_dic.dic_ind_cols_req;956 }
957#ifdef XT_STREAMING957
958 if (tab->tab_dic.dic_blob_cols_req > cols_required)958 record_loaded = FALSE;
959 cols_required = tab->tab_dic.dic_blob_cols_req;959
960#endif960 if (check_index) {
961 if (!(rec_data = xres_load_record(self, ot, rec_id, &record->rb.rb_rec_type_1, rec_size, rec_buf, cols_required)))961 cols_required = tab->tab_dic.dic_ind_cols_req;
962 goto go_on_to_free;962#ifdef XT_STREAMING
963 record_loaded = TRUE;963 if (tab->tab_dic.dic_blob_cols_req > cols_required)
964 xres_remove_index_entries(ot, rec_id, rec_data);964 cols_required = tab->tab_dic.dic_blob_cols_req;
965 }965#endif
966966 if (!(rec_data = xres_load_record(self, ot, rec_id, &record->rb.rb_rec_type_1, rec_size, rec_buf, cols_required)))
967#ifdef XT_STREAMING967 goto go_on_to_free;
968 if (tab->tab_dic.dic_blob_count) {968 record_loaded = TRUE;
969 if (!record_loaded) {969 xres_remove_index_entries(ot, rec_id, rec_data);
970 cols_required = tab->tab_dic.dic_blob_cols_req;970 }
971 if (!(rec_data = xres_load_record(self, ot, rec_id, &record->rb.rb_rec_type_1, rec_size, rec_buf, cols_required)))971
972 /* [(7)] REMOVE is followed by FREE:972#ifdef XT_STREAMING
973 goto get_rec_offset;973 if (tab->tab_dic.dic_blob_count) {
974 */974 if (!record_loaded) {
975 goto go_on_to_free;975 cols_required = tab->tab_dic.dic_blob_cols_req;
976 record_loaded = TRUE;976 if (!(rec_data = xres_load_record(self, ot, rec_id, &record->rb.rb_rec_type_1, rec_size, rec_buf, cols_required)))
977 }977 /* [(7)] REMOVE is followed by FREE:
978 myxt_release_blobs(ot, rec_data, rec_id);978 goto get_rec_offset;
979 }979 */
980#endif980 goto go_on_to_free;
981981 record_loaded = TRUE;
982 if (data_log_id && data_log_offset && log_over_size) {982 }
983 if (!ot->ot_thread->st_dlog_buf.dlb_delete_log(data_log_id, data_log_offset, log_over_size, tab->tab_id, rec_id, self)) {983 myxt_release_blobs(ot, rec_data, rec_id);
984 if (ot->ot_thread->t_exception.e_xt_err != XT_ERR_BAD_EXT_RECORD &&984 }
985 ot->ot_thread->t_exception.e_xt_err != XT_ERR_DATA_LOG_NOT_FOUND)985#endif
986 xt_log_and_clear_exception_ns();986
987 }987 if (data_log_id && data_log_offset && log_over_size) {
988 }988 if (!ot->ot_thread->st_dlog_buf.dlb_delete_log(data_log_id, data_log_offset, log_over_size, tab->tab_id, rec_id, self)) {
989989 if (ot->ot_thread->t_exception.e_xt_err != XT_ERR_BAD_EXT_RECORD &&
990 go_on_to_free:990 ot->ot_thread->t_exception.e_xt_err != XT_ERR_DATA_LOG_NOT_FOUND)
991 /* Use the new record type: */991 xt_log_and_clear_exception_ns();
992 record->rb.rb_rec_type_1 = record->rb.rb_new_rec_type_1;992 }
993 free_data = (XTTabRecFreeDPtr) &record->rb.rb_rec_type_1;993 }
994 goto do_rec_freed;994
995 }995 go_on_to_free:
996 case XT_LOG_ENT_REC_FREED:996 /* Use the new record type: */
997 rec_id = XT_GET_DISK_4(record->fr.fr_rec_id_4);997 record->rb.rb_rec_type_1 = record->rb.rb_new_rec_type_1;
998 free_data = (XTTabRecFreeDPtr) &record->fr.fr_rec_type_1;998 free_data = (XTTabRecFreeDPtr) &record->rb.rb_rec_type_1;
999 do_rec_freed:999 goto do_rec_freed;
1000 if (!in_sequence) {1000 }
1001 size_t red_size;1001 case XT_LOG_ENT_REC_FREED:
10021002 rec_id = XT_GET_DISK_4(record->fr.fr_rec_id_4);
1003 /* Free the record.1003 free_data = (XTTabRecFreeDPtr) &record->fr.fr_rec_type_1;
1004 * We place the record on front of the current1004 do_rec_freed:
1005 * free list.1005 if (!in_sequence) {
1006 *1006 size_t red_size;
1007 * However, before we do this, we remove the record1007
1008 * from its row list, if the record is on a row list.1008 /* Free the record.
1009 *1009 * We place the record on front of the current
1010 * We do this here, because in the normal removal1010 * free list.
1011 * from the row list uses the operations:1011 *
1012 *1012 * However, before we do this, we remove the record
1013 * XT_LOG_ENT_REC_UNLINKED, XT_LOG_ENT_ROW_SET and1013 * from its row list, if the record is on a row list.
1014 * XT_LOG_ENT_ROW_FREED.1014 *
1015 *1015 * We do this here, because in the normal removal
1016 * When operations are performed out of sequence,1016 * from the row list uses the operations:
1017 * these operations are ignored for the purpose1017 *
1018 * of removing the record from the row.1018 * XT_LOG_ENT_REC_UNLINKED, XT_LOG_ENT_ROW_SET and
1019 */1019 * XT_LOG_ENT_ROW_FREED.
1020 if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), sizeof(XTTabRecHeadDRec), sizeof(XTTabRecHeadDRec), (xtWord1 *) &rec_head, NULL, &self->st_statistics.st_rec, self))1020 *
1021 xt_throw(self);1021 * When operations are performed out of sequence,
1022 /* The record is already free: */1022 * these operations are ignored for the purpose
1023 if (XT_REC_IS_FREE(rec_head.tr_rec_type_1))1023 * of removing the record from the row.
1024 goto free_done;1024 */
1025 row_id = XT_GET_DISK_4(rec_head.tr_row_id_4);1025 if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), sizeof(XTTabRecHeadDRec), sizeof(XTTabRecHeadDRec), (xtWord1 *) &rec_head, NULL, &self->st_statistics.st_rec, self))
10261026 xt_throw(self);
1027 /* Search the row for this record: */1027 /* The record is already free: */
1028 if (!XT_PREAD_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, row_id), sizeof(XTTabRowRefDRec), sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf, NULL, &self->st_statistics.st_rec, self))1028 if (XT_REC_IS_FREE(rec_head.tr_rec_type_1))
1029 xt_throw(self);1029 goto free_done;
1030 link_rec_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);1030 row_id = XT_GET_DISK_4(rec_head.tr_row_id_4);
1031 prev_link_rec_id = 0;1031
1032 while (link_rec_id) {1032 /* Search the row for this record: */
1033 if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, link_rec_id), sizeof(XTTabRecHeadDRec), 0, (xtWord1 *) &rec_head, &red_size, &self->st_statistics.st_rec, self)) {1033 if (!XT_PREAD_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, row_id), sizeof(XTTabRowRefDRec), sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf, NULL, &self->st_statistics.st_rec, self))
1034 xt_log_and_clear_exception(self);1034 xt_throw(self);
1035 break;1035 link_rec_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);
1036 }1036 prev_link_rec_id = 0;
1037 if (red_size < sizeof(XTTabRecHeadDRec))1037 while (link_rec_id) {
1038 break;1038 if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, link_rec_id), sizeof(XTTabRecHeadDRec), 0, (xtWord1 *) &rec_head, &red_size, &self->st_statistics.st_rec, self)) {
1039 if (link_rec_id == rec_id)1039 xt_log_and_clear_exception(self);
1040 break;1040 break;
1041 if (XT_GET_DISK_4(rec_head.tr_row_id_4) != row_id)1041 }
1042 break;1042 if (red_size < sizeof(XTTabRecHeadDRec))
1043 switch (rec_head.tr_rec_type_1 & XT_TAB_STATUS_MASK) {1043 break;
1044 case XT_TAB_STATUS_FREED:1044 if (link_rec_id == rec_id)
1045 break;1045 break;
1046 case XT_TAB_STATUS_DELETE:1046 if (XT_GET_DISK_4(rec_head.tr_row_id_4) != row_id)
1047 case XT_TAB_STATUS_FIXED:1047 break;
1048 case XT_TAB_STATUS_VARIABLE:1048 switch (rec_head.tr_rec_type_1 & XT_TAB_STATUS_MASK) {
1049 case XT_TAB_STATUS_EXT_DLOG:1049 case XT_TAB_STATUS_FREED:
1050 break;1050 break;
1051 default:1051 case XT_TAB_STATUS_DELETE:
1052 ASSERT(FALSE);1052 case XT_TAB_STATUS_FIXED:
1053 goto exit_loop;1053 case XT_TAB_STATUS_VARIABLE:
1054 }1054 case XT_TAB_STATUS_EXT_DLOG:
1055 if (rec_head.tr_rec_type_1 & ~(XT_TAB_STATUS_CLEANED_BIT | XT_TAB_STATUS_MASK)) {1055 break;
1056 ASSERT(FALSE);1056 default:
1057 break;1057 ASSERT(FALSE);
1058 }1058 goto exit_loop;
1059 prev_link_rec_id = link_rec_id;1059 }
1060 link_rec_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);1060 if (rec_head.tr_rec_type_1 & ~(XT_TAB_STATUS_CLEANED_BIT | XT_TAB_STATUS_MASK)) {
1061 }1061 ASSERT(FALSE);
10621062 break;
1063 exit_loop:1063 }
1064 if (link_rec_id == rec_id) {1064 prev_link_rec_id = link_rec_id;
1065 /* The record was found on the row list, remove it: */1065 link_rec_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
1066 if (prev_link_rec_id) {1066 }
1067 /* We write the previous variation pointer from position 'link_rec_id' into1067
1068 * variation pointer of the 'prev_link_rec_id' record. This unlinks 'link_rec_id'!1068 exit_loop:
1069 */1069 if (link_rec_id == rec_id) {
1070 if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, prev_link_rec_id) + offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4), XT_RECORD_ID_SIZE, (xtWord1 *) &rec_head.tr_prev_rec_id_4, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))1070 /* The record was found on the row list, remove it: */
1071 xt_throw(self);1071 if (prev_link_rec_id) {
1072 tab->tab_bytes_to_flush += XT_RECORD_ID_SIZE;1072 /* We write the previous variation pointer from position 'link_rec_id' into
1073 }1073 * variation pointer of the 'prev_link_rec_id' record. This unlinks 'link_rec_id'!
1074 else {1074 */
1075 /* The record is at the front of the row list: */1075 if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, prev_link_rec_id) + offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4), XT_RECORD_ID_SIZE, (xtWord1 *) &rec_head.tr_prev_rec_id_4, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
1076 xtRefID ref_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);1076 xt_throw(self);
1077 XT_SET_DISK_4(row_buf.rr_ref_id_4, ref_id);1077 tab->tab_bytes_to_flush += XT_RECORD_ID_SIZE;
1078 if (!XT_PWRITE_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, row_id), sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))1078 }
1079 xt_throw(self);1079 else {
1080 tab->tab_bytes_to_flush += sizeof(XTTabRowRefDRec);1080 /* The record is at the front of the row list: */
1081 }1081 xtRefID ref_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
1082 } 1082 XT_SET_DISK_4(row_buf.rr_ref_id_4, ref_id);
10831083 if (!XT_PWRITE_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, row_id), sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
1084 /* Now we free the record, by placing it at the front of1084 xt_throw(self);
1085 * the free list:1085 tab->tab_bytes_to_flush += sizeof(XTTabRowRefDRec);
1086 */1086 }
1087 XT_SET_DISK_4(free_data->rf_next_rec_id_4, tab->tab_head_rec_free_id); 1087 }
1088 }1088
1089 tab->tab_head_rec_free_id = rec_id;1089 /* Now we free the record, by placing it at the front of
1090 tab->tab_head_rec_fnum++;1090 * the free list:
1091 if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), sizeof(XTTabRecFreeDRec), (xtWord1 *) free_data, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))1091 */
1092 xt_throw(self);1092 XT_SET_DISK_4(free_data->rf_next_rec_id_4, tab->tab_head_rec_free_id);
1093 tab->tab_bytes_to_flush += sizeof(XTTabRecFreeDRec);1093 }
1094 tab->tab_flush_pending = TRUE;1094 tab->tab_head_rec_free_id = rec_id;
1095 free_done:1095 tab->tab_head_rec_fnum++;
1096 break;1096 if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), sizeof(XTTabRecFreeDRec), (xtWord1 *) free_data, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
1097 case XT_LOG_ENT_REC_MOVED:1097 xt_throw(self);
1098 len = 8;1098 tab->tab_bytes_to_flush += sizeof(XTTabRecFreeDRec);
1099 rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);1099 tab->tab_flush_pending = TRUE;
1100 if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id) + offsetof(XTTabRecExtDRec, re_log_id_2), len, (xtWord1 *) &record->xw.xw_rec_type_1, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))1100 free_done:
1101 xt_throw(self);1101 break;
1102 tab->tab_bytes_to_flush += len;1102 case XT_LOG_ENT_REC_MOVED:
1103 tab->tab_flush_pending = TRUE;1103 len = 8;
1104 break;1104 rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);
1105 case XT_LOG_ENT_REC_CLEANED:1105 if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id) + offsetof(XTTabRecExtDRec, re_log_id_2), len, (xtWord1 *) &record->xw.xw_rec_type_1, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
1106 len = offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;1106 xt_throw(self);
1107 goto get_rec_offset;1107 tab->tab_bytes_to_flush += len;
1108 case XT_LOG_ENT_REC_CLEANED_1:1108 tab->tab_flush_pending = TRUE;
1109 len = 1;1109 break;
1110 goto get_rec_offset;1110 case XT_LOG_ENT_REC_CLEANED:
1111 case XT_LOG_ENT_REC_UNLINKED:1111 len = offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;
1112 if (!in_sequence) {1112 goto get_rec_offset;
1113 /* Unlink the record.1113 case XT_LOG_ENT_REC_CLEANED_1:
1114 * This is done when the record is freed.1114 len = 1;
1115 */1115 goto get_rec_offset;
1116 break;1116 case XT_LOG_ENT_REC_UNLINKED:
1117 }1117 if (!in_sequence) {
1118 len = offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;1118 /* Unlink the record.
1119 get_rec_offset:1119 * This is done when the record is freed.
1120 rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);1120 */
1121 if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), len, (xtWord1 *) &record->xw.xw_rec_type_1, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))1121 break;
1122 xt_throw(self);1122 }
1123 tab->tab_bytes_to_flush += len;1123 len = offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;
1124 tab->tab_flush_pending = TRUE;1124 get_rec_offset:
1125 break;1125 rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);
1126 case XT_LOG_ENT_ROW_NEW:1126 if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), len, (xtWord1 *) &record->xw.xw_rec_type_1, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
1127 len = offsetof(XTactRowAddedEntryDRec, xa_free_list_4);1127 xt_throw(self);
1128 row_id = XT_GET_DISK_4(record->xa.xa_row_id_4);1128 tab->tab_bytes_to_flush += len;
1129 if (!in_sequence) {1129 tab->tab_flush_pending = TRUE;
1130 /* A row was allocated from the EOF. Because operations are missing.1130 break;
1131 * The blocks between the current EOF and the new EOF need to be1131 case XT_LOG_ENT_ROW_NEW:
1132 * place on the free list!1132 len = offsetof(XTactRowAddedEntryDRec, xa_free_list_4);
1133 */ 1133 row_id = XT_GET_DISK_4(record->xa.xa_row_id_4);
1134 while (tab->tab_head_row_eof_id < row_id) {1134 if (!in_sequence) {
1135 XT_SET_DISK_4(row_buf.rr_ref_id_4, tab->tab_head_row_free_id);1135 /* A row was allocated from the EOF. Because operations are missing.
1136 if (!XT_PWRITE_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, tab->tab_head_row_eof_id), sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))1136 * The blocks between the current EOF and the new EOF need to be
1137 xt_throw(self);1137 * place on the free list!
1138 tab->tab_bytes_to_flush += sizeof(XTTabRowRefDRec);1138 */
1139 tab->tab_head_row_free_id = tab->tab_head_row_eof_id;1139 while (tab->tab_head_row_eof_id < row_id) {
1140 tab->tab_head_row_eof_id++;1140 XT_SET_DISK_4(row_buf.rr_ref_id_4, tab->tab_head_row_free_id);
1141 }1141 if (!XT_PWRITE_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, tab->tab_head_row_eof_id), sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
1142 }1142 xt_throw(self);
1143 if (tab->tab_head_row_eof_id < row_id + 1)1143 tab->tab_bytes_to_flush += sizeof(XTTabRowRefDRec);
1144 tab->tab_head_row_eof_id = row_id + 1;1144 tab->tab_head_row_free_id = tab->tab_head_row_eof_id;
1145 tab->tab_flush_pending = TRUE;1145 tab->tab_head_row_eof_id++;
1146 break;1146 }
1147 case XT_LOG_ENT_ROW_NEW_FL:1147 }
1148 len = sizeof(XTactRowAddedEntryDRec);1148 if (tab->tab_head_row_eof_id < row_id + 1)
1149 row_id = XT_GET_DISK_4(record->xa.xa_row_id_4);1149 tab->tab_head_row_eof_id = row_id + 1;
1150 free_ref_id = XT_GET_DISK_4(record->xa.xa_free_list_4);1150 tab->tab_flush_pending = TRUE;
1151 if (!in_sequence) {1151 break;
1152 size_t red_size;1152 case XT_LOG_ENT_ROW_NEW_FL:
1153 /* The record was taken from the free list.1153 len = sizeof(XTactRowAddedEntryDRec);
1154 * If the operations were in sequence, then this would be1154 row_id = XT_GET_DISK_4(record->xa.xa_row_id_4);
1155 * the front of the free list now.1155 free_ref_id = XT_GET_DISK_4(record->xa.xa_free_list_4);
1156 * However, because operations are missing, it may no1156 if (!in_sequence) {
1157 * longer be the front of the free list!1157 size_t red_size;
1158 * Search and remove:1158 /* The record was taken from the free list.
1159 */1159 * If the operations were in sequence, then this would be
1160 link_rec_id = tab->tab_head_row_free_id;1160 * the front of the free list now.
1161 prev_link_rec_id = 0;1161 * However, because operations are missing, it may no
1162 while (link_rec_id) {1162 * longer be the front of the free list!
1163 if (!XT_PREAD_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, link_rec_id), sizeof(XTTabRowRefDRec), 0, (xtWord1 *) &row_buf, &red_size, &self->st_statistics.st_rec, self)) {1163 * Search and remove:
1164 xt_log_and_clear_exception(self);1164 */
1165 break;1165 link_rec_id = tab->tab_head_row_free_id;
1166 }1166 prev_link_rec_id = 0;
1167 if (red_size < sizeof(XTTabRowRefDRec))1167 while (link_rec_id) {
1168 break;1168 if (!XT_PREAD_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, link_rec_id), sizeof(XTTabRowRefDRec), 0, (xtWord1 *) &row_buf, &red_size, &self->st_statistics.st_rec, self)) {
1169 if (link_rec_id == row_id)1169 xt_log_and_clear_exception(self);
1170 break;1170 break;
1171 prev_link_rec_id = link_rec_id;1171 }
1172 link_rec_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);1172 if (red_size < sizeof(XTTabRowRefDRec))
1173 }1173 break;
1174 if (link_rec_id == row_id) {1174 if (link_rec_id == row_id)
1175 /* The block was found on the free list, remove it: */1175 break;
1176 if (prev_link_rec_id) {1176 prev_link_rec_id = link_rec_id;
1177 /* We write the record from position 'link_rec_id' into1177 link_rec_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);
1178 * position 'prev_link_rec_id'. This unlinks 'link_rec_id'!1178 }
1179 */1179 if (link_rec_id == row_id) {
1180 if (!XT_PWRITE_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, prev_link_rec_id), sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))1180 /* The block was found on the free list, remove it: */
1181 xt_throw(self);1181 if (prev_link_rec_id) {
1182 tab->tab_bytes_to_flush += sizeof(XTTabRowRefDRec);1182 /* We write the record from position 'link_rec_id' into
1183 free_ref_id = tab->tab_head_row_free_id;1183 * position 'prev_link_rec_id'. This unlinks 'link_rec_id'!
1184 }1184 */
1185 else1185 if (!XT_PWRITE_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, prev_link_rec_id), sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
1186 /* The block is at the front of the free list: */1186 xt_throw(self);
1187 free_ref_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);1187 tab->tab_bytes_to_flush += sizeof(XTTabRowRefDRec);
1188 }1188 free_ref_id = tab->tab_head_row_free_id;
1189 else {1189 }
1190 /* Not found? */1190 else
1191 if (tab->tab_head_row_eof_id < row_id + 1)1191 /* The block is at the front of the free list: */
1192 tab->tab_head_row_eof_id = row_id + 1;1192 free_ref_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);
1193 break;1193 }
1194 }1194 else {
1195 1195 /* Not found? */
1196 }1196 if (tab->tab_head_row_eof_id < row_id + 1)
1197 if (tab->tab_head_row_eof_id < row_id + 1)1197 tab->tab_head_row_eof_id = row_id + 1;
1198 tab->tab_head_row_eof_id = row_id + 1;1198 break;
1199 tab->tab_head_row_free_id = free_ref_id;1199 }
1200 tab->tab_head_row_fnum--;1200
1201 tab->tab_flush_pending = TRUE;1201 }
1202 break;1202 if (tab->tab_head_row_eof_id < row_id + 1)
1203 case XT_LOG_ENT_ROW_FREED:1203 tab->tab_head_row_eof_id = row_id + 1;
1204 row_id = XT_GET_DISK_4(record->wr.wr_row_id_4);1204 tab->tab_head_row_free_id = free_ref_id;
1205 if (!in_sequence) {1205 tab->tab_head_row_fnum--;
1206 /* Free the row.1206 tab->tab_flush_pending = TRUE;
1207 * Since this operation is being performed out of sequence, we1207 break;
1208 * must assume that some other free and allocation operations1208 case XT_LOG_ENT_ROW_FREED:
1209 * must be missing.1209 row_id = XT_GET_DISK_4(record->wr.wr_row_id_4);
1210 * For this reason, we add the row to the front of the1210 if (!in_sequence) {
1211 * existing free list.1211 /* Free the row.
1212 */1212 * Since this operation is being performed out of sequence, we
1213 XT_SET_DISK_4(record->wr.wr_ref_id_4, tab->tab_head_row_free_id);1213 * must assume that some other free and allocation operations
1214 }1214 * must be missing.
1215 tab->tab_head_row_free_id = row_id;1215 * For this reason, we add the row to the front of the
1216 tab->tab_head_row_fnum++;1216 * existing free list.
1217 goto write_row_data;1217 */
1218 case XT_LOG_ENT_ROW_ADD_REC:1218 XT_SET_DISK_4(record->wr.wr_ref_id_4, tab->tab_head_row_free_id);
1219 row_id = XT_GET_DISK_4(record->wr.wr_row_id_4);1219 }
1220 if (!in_sequence) {1220 tab->tab_head_row_free_id = row_id;
1221 if (!XT_PREAD_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, row_id), sizeof(XTTabRowRefDRec), 0, (xtWord1 *) &row_buf, &tfer, &self->st_statistics.st_rec, self))1221 tab->tab_head_row_fnum++;
1222 xt_throw(self);1222 goto write_row_data;
1223 if (tfer == sizeof(XTTabRowRefDRec)) {1223 case XT_LOG_ENT_ROW_ADD_REC:
1224 /* Add a record to the front of the row.1224 row_id = XT_GET_DISK_4(record->wr.wr_row_id_4);
1225 * This is easy, but we have to make sure that the next1225 if (!in_sequence) {
1226 * pointer in the record is correct.1226 if (!XT_PREAD_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, row_id), sizeof(XTTabRowRefDRec), 0, (xtWord1 *) &row_buf, &tfer, &self->st_statistics.st_rec, self))
1227 */1227 xt_throw(self);
1228 rec_id = XT_GET_DISK_4(record->wr.wr_ref_id_4);1228 if (tfer == sizeof(XTTabRowRefDRec)) {
1229 if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), sizeof(XTTabRecHeadDRec), 0, (xtWord1 *) &rec_head, &tfer, &self->st_statistics.st_rec, self))1229 /* Add a record to the front of the row.
1230 xt_throw(self);1230 * This is easy, but we have to make sure that the next
1231 if (tfer == sizeof(XTTabRecHeadDRec) && XT_GET_DISK_4(rec_head.tr_row_id_4) == row_id) {1231 * pointer in the record is correct.
1232 /* This is now the correct next pointer: */1232 */
1233 xtRecordID next_ref_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);1233 rec_id = XT_GET_DISK_4(record->wr.wr_ref_id_4);
1234 if (XT_GET_DISK_4(rec_head.tr_prev_rec_id_4) != next_ref_id &&1234 if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), sizeof(XTTabRecHeadDRec), 0, (xtWord1 *) &rec_head, &tfer, &self->st_statistics.st_rec, self))
1235 rec_id != next_ref_id) {1235 xt_throw(self);
1236 XT_SET_DISK_4(rec_head.tr_prev_rec_id_4, next_ref_id);1236 if (tfer == sizeof(XTTabRecHeadDRec) && XT_GET_DISK_4(rec_head.tr_row_id_4) == row_id) {
1237 if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), sizeof(XTTabRecHeadDRec), (xtWord1 *) &rec_head, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))1237 /* This is now the correct next pointer: */
1238 xt_throw(self);1238 xtRecordID next_ref_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);
1239 tab->tab_bytes_to_flush += sizeof(XTTabRecHeadDRec);1239 if (XT_GET_DISK_4(rec_head.tr_prev_rec_id_4) != next_ref_id &&
1240 }1240 rec_id != next_ref_id) {
1241 }1241 XT_SET_DISK_4(rec_head.tr_prev_rec_id_4, next_ref_id);
1242 }1242 if (!XT_PWRITE_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), sizeof(XTTabRecHeadDRec), (xtWord1 *) &rec_head, &ot->ot_thread->st_statistics.st_rec, ot->ot_thread))
12431243 xt_throw(self);
1244 }1244 tab->tab_bytes_to_flush += sizeof(XTTabRecHeadDRec);
1245 goto write_row_data;1245 }
1246 case XT_LOG_ENT_ROW_SET:1246 }
1247 if (!in_sequence)1247 }
1248 /* This operation is ignored when out of sequence!1248
1249 * The operation is used to remove a record from a row.1249 }
1250 * This is done automatically when the record is freed.1250 goto write_row_data;
1251 */1251 case XT_LOG_ENT_ROW_SET:
1252 break;1252 if (!in_sequence)
1253 row_id = XT_GET_DISK_4(record->wr.wr_row_id_4);1253 /* This operation is ignored when out of sequence!
1254 write_row_data:1254 * The operation is used to remove a record from a row.
1255 ASSERT_NS(XT_GET_DISK_4(record->wr.wr_ref_id_4) < tab->tab_head_rec_eof_id);1255 * This is done automatically when the record is freed.
1256 if (!XT_PWRITE_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, row_id), sizeof(XTTabRowRefDRec), (xtWord1 *) &record->wr.wr_ref_id_4, &ot->ot_thread->st_statistics.st_rec, self))1256 */
1257 xt_throw(self);1257 break;
1258 tab->tab_bytes_to_flush += sizeof(XTTabRowRefDRec);1258 row_id = XT_GET_DISK_4(record->wr.wr_row_id_4);
1259 if (tab->tab_head_row_eof_id < row_id + 1)1259 write_row_data:
1260 tab->tab_head_row_eof_id = row_id + 1;1260 ASSERT_NS(XT_GET_DISK_4(record->wr.wr_ref_id_4) < tab->tab_head_rec_eof_id);
1261 tab->tab_flush_pending = TRUE;1261 if (!XT_PWRITE_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, row_id), sizeof(XTTabRowRefDRec), (xtWord1 *) &record->wr.wr_ref_id_4, &ot->ot_thread->st_statistics.st_rec, self))
1262 break;1262 xt_throw(self);
1263 case XT_LOG_ENT_NO_OP:1263 tab->tab_bytes_to_flush += sizeof(XTTabRowRefDRec);
1264 case XT_LOG_ENT_END_OF_LOG:1264 if (tab->tab_head_row_eof_id < row_id + 1)
1265 break;1265 tab->tab_head_row_eof_id = row_id + 1;
1266 }1266 tab->tab_flush_pending = TRUE;
1267}1267 break;
12681268 case XT_LOG_ENT_NO_OP:
1269/*1269 case XT_LOG_ENT_END_OF_LOG:
1270 * Apply all operations that have been buffered1270 break;
1271 * for a particular table.1271 }
1272 * Operations are buffered if they are1272}
1273 * read from the log out of sequence.1273
1274 *1274/*
1275 * In this case we buffer, and wait for the1275 * Apply all operations that have been buffered
1276 * out of sequence operations to arrive.1276 * for a particular table.
1277 *1277 * Operations are buffered if they are
1278 * When the server is running, this will always be1278 * read from the log out of sequence.
1279 * the case. A delay occurs while a transaction 1279 *
1280 * fills its private log buffer.1280 * In this case we buffer, and wait for the
1281 */1281 * out of sequence operations to arrive.
1282static void xres_apply_operations(XTThreadPtr self, XTWriterStatePtr ws, xtBool in_sequence)1282 *
1283{1283 * When the server is running, this will always be
1284 XTTableHPtr tab = ws->ws_ot->ot_table;1284 * the case. A delay occurs while a transaction
1285 u_int i = 0;1285 * fills its private log buffer.
1286 XTOperationPtr op;1286 */
1287 xtBool check_index;1287static void xres_apply_operations(XTThreadPtr self, XTWriterStatePtr ws, xtBool in_sequence)
12881288{
1289// XTDatabaseHPtr db, XTOpenTablePtr ot, XTXactSeqReadPtr sr, XTDataBufferPtr databuf1289 XTTableHPtr tab = ws->ws_ot->ot_table;
1290 xt_sl_lock(self, tab->tab_op_list);1290 u_int i = 0;
1291 for (;;) {1291 XTOperationPtr op;
1292 op = (XTOperationPtr) xt_sl_item_at(tab->tab_op_list, i);1292 xtBool check_index;
1293 if (!op)1293
1294 break;1294// XTDatabaseHPtr db, XTOpenTablePtr ot, XTXactSeqReadPtr sr, XTDataBufferPtr databuf
1295 if (in_sequence && tab->tab_head_op_seq+1 != op->or_op_seq)1295 xt_sl_lock(self, tab->tab_op_list);
1296 break;1296 for (;;) {
1297 xt_db_set_size(self, &ws->ws_databuf, (size_t) op->or_op_len);1297 op = (XTOperationPtr) xt_sl_item_at(tab->tab_op_list, i);
1298 if (!ws->ws_db->db_xlog.xlog_rnd_read(&ws->ws_seqread, op->or_log_id, op->or_log_offset, (size_t) op->or_op_len, ws->ws_databuf.db_data, NULL, self))1298 if (!op)
1299 xt_throw(self);1299 break;
1300 check_index = ws->ws_in_recover && xt_comp_log_pos(op->or_log_id, op->or_log_offset, ws->ws_ind_rec_log_id, ws->ws_ind_rec_log_offset) >= 0;1300 if (in_sequence && tab->tab_head_op_seq+1 != op->or_op_seq)
1301 xres_apply_change(self, ws->ws_ot, (XTXactLogBufferDPtr) ws->ws_databuf.db_data, in_sequence, check_index, &ws->ws_rec_buf);1301 break;
1302 tab->tab_head_op_seq = op->or_op_seq;1302 xt_db_set_size(self, &ws->ws_databuf, (size_t) op->or_op_len);
1303 if (tab->tab_wr_wake_freeer) {1303 if (!ws->ws_db->db_xlog.xlog_rnd_read(&ws->ws_seqread, op->or_log_id, op->or_log_offset, (size_t) op->or_op_len, ws->ws_databuf.db_data, NULL, self))
1304 if (!XTTableSeq::xt_op_is_before(tab->tab_head_op_seq, tab->tab_wake_freeer_op))1304 xt_throw(self);
1305 xt_wr_wake_freeer(self);1305 check_index = ws->ws_in_recover && xt_comp_log_pos(op->or_log_id, op->or_log_offset, ws->ws_ind_rec_log_id, ws->ws_ind_rec_log_offset) >= 0;
1306 }1306 xres_apply_change(self, ws->ws_ot, (XTXactLogBufferDPtr) ws->ws_databuf.db_data, in_sequence, check_index, &ws->ws_rec_buf);
1307 i++;1307 tab->tab_head_op_seq = op->or_op_seq;
1308 }1308 if (tab->tab_wr_wake_freeer) {
1309 xt_sl_remove_from_front(self, tab->tab_op_list, i);1309 if (!XTTableSeq::xt_op_is_before(tab->tab_head_op_seq, tab->tab_wake_freeer_op))
1310 xt_sl_unlock(self, tab->tab_op_list);1310 xt_wr_wake_freeer(self);
1311}1311 }
13121312 i++;
1313/* Check for operations still remaining on tables.1313 }
1314 * These operations are applied even though operations1314 xt_sl_remove_from_front(self, tab->tab_op_list, i);
1315 * in sequence are missing.1315 xt_sl_unlock(self, tab->tab_op_list);
1316 */1316}
1317xtBool xres_sync_operations(XTThreadPtr self, XTDatabaseHPtr db, XTWriterStatePtr ws)1317
1318{1318/* Check for operations still remaining on tables.
1319 u_int edx;1319 * These operations are applied even though operations
1320 XTTableEntryPtr te_ptr;1320 * in sequence are missing.
1321 XTTableHPtr tab;1321 */
1322 xtBool op_synced = FALSE;1322xtBool xres_sync_operations(XTThreadPtr self, XTDatabaseHPtr db, XTWriterStatePtr ws)
13231323{
1324 xt_enum_tables_init(&edx);1324 u_int edx;
1325 while ((te_ptr = xt_enum_tables_next(self, db, &edx))) {1325 XTTableEntryPtr te_ptr;
1326 /* Dirty read of tab_op_list OK, here because this is the1326 XTTableHPtr tab;
1327 * only thread that updates the list!1327 xtBool op_synced = FALSE;
1328 */1328
1329 if ((tab = te_ptr->te_table)) {1329 xt_enum_tables_init(&edx);
1330 if (xt_sl_get_size(tab->tab_op_list)) {1330 while ((te_ptr = xt_enum_tables_next(self, db, &edx))) {
1331 op_synced = TRUE;1331 /* Dirty read of tab_op_list OK, here because this is the
1332 if (xres_open_table(self, ws, te_ptr->te_tab_id))1332 * only thread that updates the list!
1333 xres_apply_operations(self, ws, FALSE);1333 */
1334 }1334 if ((tab = te_ptr->te_table)) {
13351335 if (xt_sl_get_size(tab->tab_op_list)) {
1336 /* Update the pointer cache: */1336 op_synced = TRUE;
1337 tab->tab_seq.xt_op_seq_set(self, tab->tab_head_op_seq+1);1337 if (xres_open_table(self, ws, te_ptr->te_tab_id))
1338 tab->tab_row_eof_id = tab->tab_head_row_eof_id;1338 xres_apply_operations(self, ws, FALSE);
1339 tab->tab_row_free_id = tab->tab_head_row_free_id;1339 }
1340 tab->tab_row_fnum = tab->tab_head_row_fnum;1340
1341 tab->tab_rec_eof_id = tab->tab_head_rec_eof_id;1341 /* Update the pointer cache: */
1342 tab->tab_rec_free_id = tab->tab_head_rec_free_id;1342 tab->tab_seq.xt_op_seq_set(self, tab->tab_head_op_seq+1);
1343 tab->tab_rec_fnum = tab->tab_head_rec_fnum;1343 tab->tab_row_eof_id = tab->tab_head_row_eof_id;
1344 }1344 tab->tab_row_free_id = tab->tab_head_row_free_id;
1345 }1345 tab->tab_row_fnum = tab->tab_head_row_fnum;
1346 return op_synced;1346 tab->tab_rec_eof_id = tab->tab_head_rec_eof_id;
1347}1347 tab->tab_rec_free_id = tab->tab_head_rec_free_id;
13481348 tab->tab_rec_fnum = tab->tab_head_rec_fnum;
1349/*1349 }
1350 * Operations from the log are applied in sequence order.1350 }
1351 * If the operations are out of sequence, they are buffered1351 return op_synced;
1352 * until the missing operations appear.1352}
1353 *1353
1354 * NOTE: No lock is required because there should only be1354/*
1355 * one thread that does this!1355 * Operations from the log are applied in sequence order.
1356 */1356 * If the operations are out of sequence, they are buffered
1357xtPublic void xt_xres_apply_in_order(XTThreadPtr self, XTWriterStatePtr ws, xtLogID log_id, xtLogOffset log_offset, XTXactLogBufferDPtr record)1357 * until the missing operations appear.
1358{1358 *
1359 xtOpSeqNo op_seq;1359 * NOTE: No lock is required because there should only be
1360 xtTableID tab_id;1360 * one thread that does this!
1361 size_t len;1361 */
1362 xtBool check_index;1362xtPublic void xt_xres_apply_in_order(XTThreadPtr self, XTWriterStatePtr ws, xtLogID log_id, xtLogOffset log_offset, XTXactLogBufferDPtr record)
13631363{
1364// XTDatabaseHPtr db, XTOpenTablePtr *ot, XTXactSeqReadPtr sr, XTDataBufferPtr databuf1364 xtOpSeqNo op_seq;
1365 switch (record->xl.xl_status_1) {1365 xtTableID tab_id;
1366 case XT_LOG_ENT_REC_MODIFIED:1366 size_t len;
1367 case XT_LOG_ENT_UPDATE:1367 xtBool check_index;
1368 case XT_LOG_ENT_INSERT:1368
1369 case XT_LOG_ENT_DELETE:1369// XTDatabaseHPtr db, XTOpenTablePtr *ot, XTXactSeqReadPtr sr, XTDataBufferPtr databuf
1370 case XT_LOG_ENT_UPDATE_BG:1370 switch (record->xl.xl_status_1) {
1371 case XT_LOG_ENT_INSERT_BG:1371 case XT_LOG_ENT_REC_MODIFIED:
1372 case XT_LOG_ENT_DELETE_BG:1372 case XT_LOG_ENT_UPDATE:
1373 len = offsetof(XTactUpdateEntryDRec, xu_rec_type_1) + (size_t) XT_GET_DISK_2(record->xu.xu_size_2);1373 case XT_LOG_ENT_INSERT:
1374 op_seq = XT_GET_DISK_4(record->xu.xu_op_seq_4);1374 case XT_LOG_ENT_DELETE:
1375 tab_id = XT_GET_DISK_4(record->xu.xu_tab_id_4);1375 case XT_LOG_ENT_UPDATE_BG:
1376 break;1376 case XT_LOG_ENT_INSERT_BG:
1377 case XT_LOG_ENT_UPDATE_FL:1377 case XT_LOG_ENT_DELETE_BG:
1378 case XT_LOG_ENT_INSERT_FL:1378 len = offsetof(XTactUpdateEntryDRec, xu_rec_type_1) + (size_t) XT_GET_DISK_2(record->xu.xu_size_2);
1379 case XT_LOG_ENT_DELETE_FL:1379 op_seq = XT_GET_DISK_4(record->xu.xu_op_seq_4);
1380 case XT_LOG_ENT_UPDATE_FL_BG:1380 tab_id = XT_GET_DISK_4(record->xu.xu_tab_id_4);
1381 case XT_LOG_ENT_INSERT_FL_BG:1381 break;
1382 case XT_LOG_ENT_DELETE_FL_BG:1382 case XT_LOG_ENT_UPDATE_FL:
1383 len = offsetof(XTactUpdateFLEntryDRec, xf_rec_type_1) + (size_t) XT_GET_DISK_2(record->xf.xf_size_2);1383 case XT_LOG_ENT_INSERT_FL:
1384 op_seq = XT_GET_DISK_4(record->xf.xf_op_seq_4);1384 case XT_LOG_ENT_DELETE_FL:
1385 tab_id = XT_GET_DISK_4(record->xf.xf_tab_id_4);1385 case XT_LOG_ENT_UPDATE_FL_BG:
1386 break;1386 case XT_LOG_ENT_INSERT_FL_BG:
1387 case XT_LOG_ENT_REC_FREED:1387 case XT_LOG_ENT_DELETE_FL_BG:
1388 case XT_LOG_ENT_REC_REMOVED:1388 len = offsetof(XTactUpdateFLEntryDRec, xf_rec_type_1) + (size_t) XT_GET_DISK_2(record->xf.xf_size_2);
1389 case XT_LOG_ENT_REC_REMOVED_EXT:1389 op_seq = XT_GET_DISK_4(record->xf.xf_op_seq_4);
1390 /* [(7)] REMOVE is now a extended version of FREE! */1390 tab_id = XT_GET_DISK_4(record->xf.xf_tab_id_4);
1391 len = offsetof(XTactFreeRecEntryDRec, fr_rec_type_1) + sizeof(XTTabRecFreeDRec);1391 break;
1392 goto fixed_len_data;1392 case XT_LOG_ENT_REC_FREED:
1393 case XT_LOG_ENT_REC_REMOVED_BI:1393 case XT_LOG_ENT_REC_REMOVED:
1394 len = offsetof(XTactRemoveBIEntryDRec, rb_rec_type_1) + (size_t) XT_GET_DISK_2(record->rb.rb_size_2);1394 case XT_LOG_ENT_REC_REMOVED_EXT:
1395 op_seq = XT_GET_DISK_4(record->rb.rb_op_seq_4);1395 /* [(7)] REMOVE is now a extended version of FREE! */
1396 tab_id = XT_GET_DISK_4(record->rb.rb_tab_id_4);1396 len = offsetof(XTactFreeRecEntryDRec, fr_rec_type_1) + sizeof(XTTabRecFreeDRec);
1397 break;1397 goto fixed_len_data;
1398 case XT_LOG_ENT_REC_MOVED:1398 case XT_LOG_ENT_REC_REMOVED_BI:
1399 len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + 8;1399 len = offsetof(XTactRemoveBIEntryDRec, rb_rec_type_1) + (size_t) XT_GET_DISK_2(record->rb.rb_size_2);
1400 goto fixed_len_data;1400 op_seq = XT_GET_DISK_4(record->rb.rb_op_seq_4);
1401 case XT_LOG_ENT_REC_CLEANED:1401 tab_id = XT_GET_DISK_4(record->rb.rb_tab_id_4);
1402 len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;1402 break;
1403 goto fixed_len_data;1403 case XT_LOG_ENT_REC_MOVED:
1404 case XT_LOG_ENT_REC_CLEANED_1:1404 len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + 8;
1405 len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + 1;1405 goto fixed_len_data;
1406 goto fixed_len_data;1406 case XT_LOG_ENT_REC_CLEANED:
1407 case XT_LOG_ENT_REC_UNLINKED:1407 len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;
1408 len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;1408 goto fixed_len_data;
1409 fixed_len_data:1409 case XT_LOG_ENT_REC_CLEANED_1:
1410 op_seq = XT_GET_DISK_4(record->xw.xw_op_seq_4);1410 len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + 1;
1411 tab_id = XT_GET_DISK_4(record->xw.xw_tab_id_4);1411 goto fixed_len_data;
1412 break;1412 case XT_LOG_ENT_REC_UNLINKED:
1413 case XT_LOG_ENT_ROW_NEW:1413 len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;
1414 len = sizeof(XTactRowAddedEntryDRec) - 4;1414 fixed_len_data:
1415 goto new_row;1415 op_seq = XT_GET_DISK_4(record->xw.xw_op_seq_4);
1416 case XT_LOG_ENT_ROW_NEW_FL:1416 tab_id = XT_GET_DISK_4(record->xw.xw_tab_id_4);
1417 len = sizeof(XTactRowAddedEntryDRec);1417 break;
1418 new_row:1418 case XT_LOG_ENT_ROW_NEW:
1419 op_seq = XT_GET_DISK_4(record->xa.xa_op_seq_4);1419 len = sizeof(XTactRowAddedEntryDRec) - 4;
1420 tab_id = XT_GET_DISK_4(record->xa.xa_tab_id_4);1420 goto new_row;
1421 break;1421 case XT_LOG_ENT_ROW_NEW_FL:
1422 case XT_LOG_ENT_ROW_ADD_REC:1422 len = sizeof(XTactRowAddedEntryDRec);
1423 case XT_LOG_ENT_ROW_SET:1423 new_row:
1424 case XT_LOG_ENT_ROW_FREED:1424 op_seq = XT_GET_DISK_4(record->xa.xa_op_seq_4);
1425 len = offsetof(XTactWriteRowEntryDRec, wr_ref_id_4) + sizeof(XTTabRowRefDRec);1425 tab_id = XT_GET_DISK_4(record->xa.xa_tab_id_4);
1426 op_seq = XT_GET_DISK_4(record->wr.wr_op_seq_4);1426 break;
1427 tab_id = XT_GET_DISK_4(record->wr.wr_tab_id_4);1427 case XT_LOG_ENT_ROW_ADD_REC:
1428 break;1428 case XT_LOG_ENT_ROW_SET:
1429 case XT_LOG_ENT_NO_OP:1429 case XT_LOG_ENT_ROW_FREED:
1430 case XT_LOG_ENT_END_OF_LOG:1430 len = offsetof(XTactWriteRowEntryDRec, wr_ref_id_4) + sizeof(XTTabRowRefDRec);
1431 return;1431 op_seq = XT_GET_DISK_4(record->wr.wr_op_seq_4);
1432 default:1432 tab_id = XT_GET_DISK_4(record->wr.wr_tab_id_4);
1433 return;1433 break;
1434 }1434 case XT_LOG_ENT_NO_OP:
14351435 case XT_LOG_ENT_END_OF_LOG:
1436 if (!xres_open_table(self, ws, tab_id))1436 return;
1437 return;1437 default:
14381438 return;
1439 XTTableHPtr tab = ws->ws_ot->ot_table;1439 }
14401440
1441 /* NOTE:1441 if (!xres_open_table(self, ws, tab_id))
1442 *1442 return;
1443 * During normal operation this is actually given.1443
1444 *1444 XTTableHPtr tab = ws->ws_ot->ot_table;
1445 * During recovery, it only applies to the record/row files1445
1446 * The index file is flushed indepently, and changes may1446 /* NOTE:
1447 * have been applied to the index (due to a call to flush index,1447 *
1448 * which comes as a result of out of memory) that have not been1448 * During normal operation this is actually given.
1449 * applied to the record/row files.1449 *
1450 *1450 * During recovery, it only applies to the record/row files
1451 * As a result we need to do the index checks that apply to this1451 * The index file is flushed indepently, and changes may
1452 * change.1452 * have been applied to the index (due to a call to flush index,
1453 *1453 * which comes as a result of out of memory) that have not been
1454 * At the moment, I will just do everything, which should not1454 * applied to the record/row files.
1455 * hurt!1455 *
1456 *1456 * As a result we need to do the index checks that apply to this
1457 * This error can be repeated by running the test1457 * change.
1458 * runTest(OUT_OF_CACHE_UPDATE_TEST, 32, OUT_OF_CACHE_UPDATE_TEST_UPDATE_COUNT, OUT_OF_CACHE_UPDATE_TEST_SET_SIZE)1458 *
1459 * and crashing after a while.1459 * At the moment, I will just do everything, which should not
1460 *1460 * hurt!
1461 * Do this by setting not_this to NULL. This will cause the test to1461 *
1462 * hang after a while. After a restart the indexes are corrupt if the1462 * This error can be repeated by running the test
1463 * ws->ws_in_recover condition is not present here. 1463 * runTest(OUT_OF_CACHE_UPDATE_TEST, 32, OUT_OF_CACHE_UPDATE_TEST_UPDATE_COUNT, OUT_OF_CACHE_UPDATE_TEST_SET_SIZE)
1464 */1464 * and crashing after a while.
1465 if (ws->ws_in_recover) {1465 *
1466 if (!tab->tab_recovery_done) {1466 * Do this by setting not_this to NULL. This will cause the test to
1467 /* op_seq <= tab_head_op_seq + 1: */1467 * hang after a while. After a restart the indexes are corrupt if the
1468 ASSERT(XTTableSeq::xt_op_is_before(op_seq, tab->tab_head_op_seq+2));1468 * ws->ws_in_recover condition is not present here.
1469 if (XTTableSeq::xt_op_is_before(op_seq-1, tab->tab_head_op_seq))1469 */
1470 /* Adjust the operation sequence number: */1470 if (ws->ws_in_recover) {
1471 tab->tab_head_op_seq = op_seq-1;1471 if (!tab->tab_recovery_done) {
1472 tab->tab_recovery_done = TRUE;1472 /* op_seq <= tab_head_op_seq + 1: */
1473 }1473 ASSERT(XTTableSeq::xt_op_is_before(op_seq, tab->tab_head_op_seq+2));
1474 }1474 if (XTTableSeq::xt_op_is_before(op_seq-1, tab->tab_head_op_seq))
14751475 /* Adjust the operation sequence number: */
1476 if (!XTTableSeq::xt_op_is_before(tab->tab_head_op_seq, op_seq))1476 tab->tab_head_op_seq = op_seq-1;
1477 return;1477 tab->tab_recovery_done = TRUE;
14781478 }
1479 if (tab->tab_head_op_seq+1 == op_seq) {1479 }
1480 /* I could use tab_ind_rec_log_id, but this may be a problem, if1480
1481 * recovery does not recover up to the last committed transaction.1481 if (!XTTableSeq::xt_op_is_before(tab->tab_head_op_seq, op_seq))
1482 */ 1482 return;
1483 check_index = ws->ws_in_recover && xt_comp_log_pos(log_id, log_offset, ws->ws_ind_rec_log_id, ws->ws_ind_rec_log_offset) >= 0;1483
1484 xres_apply_change(self, ws->ws_ot, record, TRUE, check_index, &ws->ws_rec_buf);1484 if (tab->tab_head_op_seq+1 == op_seq) {
1485 tab->tab_head_op_seq = op_seq;1485 /* I could use tab_ind_rec_log_id, but this may be a problem, if
1486 if (tab->tab_wr_wake_freeer) {1486 * recovery does not recover up to the last committed transaction.
1487 if (!XTTableSeq::xt_op_is_before(tab->tab_head_op_seq, tab->tab_wake_freeer_op))1487 */
1488 xt_wr_wake_freeer(self);1488 check_index = ws->ws_in_recover && xt_comp_log_pos(log_id, log_offset, ws->ws_ind_rec_log_id, ws->ws_ind_rec_log_offset) >= 0;
1489 }1489 xres_apply_change(self, ws->ws_ot, record, TRUE, check_index, &ws->ws_rec_buf);
14901490 tab->tab_head_op_seq = op_seq;
1491 /* Apply any operations in the list that now follow on...1491 if (tab->tab_wr_wake_freeer) {
1492 * NOTE: the tab_op_list only has be locked for modification.1492 if (!XTTableSeq::xt_op_is_before(tab->tab_head_op_seq, tab->tab_wake_freeer_op))
1493 * This is because only one thread ever changes the list1493 xt_wr_wake_freeer(self);
1494 * (on startup and the writer), but the checkpoint thread1494 }
1495 * reads it.1495
1496 */ 1496 /* Apply any operations in the list that now follow on...
1497 XTOperationPtr op;1497 * NOTE: the tab_op_list only has be locked for modification.
1498 if ((op = (XTOperationPtr) xt_sl_first_item(tab->tab_op_list))) {1498 * This is because only one thread ever changes the list
1499 if (tab->tab_head_op_seq+1 == op->or_op_seq) {1499 * (on startup and the writer), but the checkpoint thread
1500 xres_apply_operations(self, ws, TRUE);1500 * reads it.
1501 }1501 */
1502 }1502 XTOperationPtr op;
1503 }1503 if ((op = (XTOperationPtr) xt_sl_first_item(tab->tab_op_list))) {
1504 else {1504 if (tab->tab_head_op_seq+1 == op->or_op_seq) {
1505 /* Add the operation to the list: */1505 xres_apply_operations(self, ws, TRUE);
1506 XTOperationRec op;1506 }
15071507 }
1508 op.or_op_seq = op_seq;1508 }
1509 op.or_op_len = len;1509 else {
1510 op.or_log_id = log_id;1510 /* Add the operation to the list: */
1511 op.or_log_offset = log_offset;1511 XTOperationRec op;
1512 xt_sl_lock(self, tab->tab_op_list);1512
1513 xt_sl_insert(self, tab->tab_op_list, &op_seq, &op);1513 op.or_op_seq = op_seq;
1514 ASSERT(tab->tab_op_list->sl_usage_count < 1000000);1514 op.or_op_len = len;
1515 xt_sl_unlock(self, tab->tab_op_list);1515 op.or_log_id = log_id;
1516 }1516 op.or_log_offset = log_offset;
1517}1517 xt_sl_lock(self, tab->tab_op_list);
15181518 xt_sl_insert(self, tab->tab_op_list, &op_seq, &op);
1519/* ----------------------------------------------------------------------1519 ASSERT(tab->tab_op_list->sl_usage_count < 1000000);
1520 * CHECKPOINTING FUNCTIONALITY1520 xt_sl_unlock(self, tab->tab_op_list);
1521 */1521 }
15221522}
1523static xtBool xres_delete_data_log(XTDatabaseHPtr db, xtLogID log_id)1523
1524{1524/* ----------------------------------------------------------------------
1525 XTDataLogFilePtr data_log;1525 * CHECKPOINTING FUNCTIONALITY
1526 char path[PATH_MAX];1526 */
15271527
1528 db->db_datalogs.dlc_name(PATH_MAX, path, log_id);1528static xtBool xres_delete_data_log(XTDatabaseHPtr db, xtLogID log_id)
15291529{
1530 if (!db->db_datalogs.dlc_remove_data_log(log_id, TRUE))1530 XTDataLogFilePtr data_log;
1531 return FAILED;1531 char path[PATH_MAX];
15321532
1533 if (xt_fs_exists(path)) {1533 db->db_datalogs.dlc_name(PATH_MAX, path, log_id);
1534#ifdef DEBUG_LOG_DELETE1534
1535 printf("-- delete log: %s\n", path);1535 if (!db->db_datalogs.dlc_remove_data_log(log_id, TRUE))
1536#endif1536 return FAILED;
1537 if (!xt_fs_delete(NULL, path))1537
1538 return FAILED;1538 if (xt_fs_exists(path)) {
1539 }1539#ifdef DEBUG_LOG_DELETE
1540 /* The log was deleted: */1540 printf("-- delete log: %s\n", path);
1541 if (!db->db_datalogs.dlc_get_data_log(&data_log, log_id, TRUE, NULL))1541#endif
1542 return FAILED;1542 if (!xt_fs_delete(NULL, path))
1543 if (data_log) {1543 return FAILED;
1544 if (!db->db_datalogs.dls_set_log_state(data_log, XT_DL_DELETED))1544 }
1545 return FAILED;1545 /* The log was deleted: */
1546 }1546 if (!db->db_datalogs.dlc_get_data_log(&data_log, log_id, TRUE, NULL))
1547 return OK;1547 return FAILED;
1548}1548 if (data_log) {
15491549 if (!db->db_datalogs.dls_set_log_state(data_log, XT_DL_DELETED))
1550static int xres_comp_flush_tabs(XTThreadPtr XT_UNUSED(self), register const void *XT_UNUSED(thunk), register const void *a, register const void *b)1550 return FAILED;
1551{1551 }
1552 xtTableID tab_id = *((xtTableID *) a);1552 return OK;
1553 XTCheckPointTablePtr cp_tab = (XTCheckPointTablePtr) b;1553}
15541554
1555 if (tab_id < cp_tab->cpt_tab_id)1555static int xres_comp_flush_tabs(XTThreadPtr XT_UNUSED(self), register const void *XT_UNUSED(thunk), register const void *a, register const void *b)
1556 return -1;1556{
1557 if (tab_id > cp_tab->cpt_tab_id)1557 xtTableID tab_id = *((xtTableID *) a);
1558 return 1;1558 XTCheckPointTablePtr cp_tab = (XTCheckPointTablePtr) b;
1559 return 0;1559
1560}1560 if (tab_id < cp_tab->cpt_tab_id)
15611561 return -1;
1562static void xres_init_checkpoint_state(XTThreadPtr self, XTCheckPointStatePtr cp)1562 if (tab_id > cp_tab->cpt_tab_id)
1563{1563 return 1;
1564 xt_init_mutex_with_autoname(self, &cp->cp_state_lock);1564 return 0;
1565}1565}
15661566
1567static void xres_free_checkpoint_state(XTThreadPtr self, XTCheckPointStatePtr cp)1567static void xres_init_checkpoint_state(XTThreadPtr self, XTCheckPointStatePtr cp)
1568{1568{
1569 xt_free_mutex(&cp->cp_state_lock);1569 xt_init_mutex_with_autoname(self, &cp->cp_state_lock);
1570 if (cp->cp_table_ids) {1570}
1571 xt_free_sortedlist(self, cp->cp_table_ids);1571
1572 cp->cp_table_ids = NULL;1572static void xres_free_checkpoint_state(XTThreadPtr self, XTCheckPointStatePtr cp)
1573 }1573{
1574}1574 xt_free_mutex(&cp->cp_state_lock);
15751575 if (cp->cp_table_ids) {
1576/*1576 xt_free_sortedlist(self, cp->cp_table_ids);
1577 * Remove the deleted logs so that they can be re-used.1577 cp->cp_table_ids = NULL;
1578 * This is only possible after a checkpoint has been1578 }
1579 * written that does _not_ include these logs as logs1579}
1580 * to be deleted!1580
1581 */1581/*
1582static xtBool xres_remove_data_logs(XTDatabaseHPtr db)1582 * Remove the deleted logs so that they can be re-used.
1583{1583 * This is only possible after a checkpoint has been
1584 u_int no_of_logs = xt_sl_get_size(db->db_datalogs.dlc_deleted);1584 * written that does _not_ include these logs as logs
1585 xtLogID *log_id_ptr;1585 * to be deleted!
15861586 */
1587 for (u_int i=0; i<no_of_logs; i++) {1587static xtBool xres_remove_data_logs(XTDatabaseHPtr db)
1588 log_id_ptr = (xtLogID *) xt_sl_item_at(db->db_datalogs.dlc_deleted, i);1588{
1589 if (!db->db_datalogs.dlc_remove_data_log(*log_id_ptr, FALSE))1589 u_int no_of_logs = xt_sl_get_size(db->db_datalogs.dlc_deleted);
1590 return FAILED;1590 xtLogID *log_id_ptr;
1591 }1591
1592 xt_sl_set_size(db->db_datalogs.dlc_deleted, 0);1592 for (u_int i=0; i<no_of_logs; i++) {
1593 return OK;1593 log_id_ptr = (xtLogID *) xt_sl_item_at(db->db_datalogs.dlc_deleted, i);
1594}1594 if (!db->db_datalogs.dlc_remove_data_log(*log_id_ptr, FALSE))
15951595 return FAILED;
1596/* ----------------------------------------------------------------------1596 }
1597 * INIT & EXIT1597 xt_sl_set_size(db->db_datalogs.dlc_deleted, 0);
1598 */1598 return OK;
15991599}
1600xtPublic void xt_xres_init(XTThreadPtr self, XTDatabaseHPtr db)1600
1601{1601/* ----------------------------------------------------------------------
1602 xtLogID max_log_id;1602 * INIT & EXIT
16031603 */
1604 xt_init_mutex_with_autoname(self, &db->db_cp_lock);1604
1605 xt_init_cond(self, &db->db_cp_cond);1605xtPublic void xt_xres_init(XTThreadPtr self, XTDatabaseHPtr db)
1606 1606{
1607 xres_init_checkpoint_state(self, &db->db_cp_state);1607 xtLogID max_log_id;
1608 db->db_restart.xres_init(self, db, &db->db_wr_log_id, &db->db_wr_log_offset, &max_log_id);1608
16091609 xt_init_mutex_with_autoname(self, &db->db_cp_lock);
1610 /* It is also the position where transactions will start writing the1610 xt_init_cond(self, &db->db_cp_cond);
1611 * log:1611
1612 */1612 xres_init_checkpoint_state(self, &db->db_cp_state);
1613 if (!db->db_xlog.xlog_set_write_offset(db->db_wr_log_id, db->db_wr_log_offset, max_log_id, self))1613 db->db_restart.xres_init(self, db, &db->db_wr_log_id, &db->db_wr_log_offset, &max_log_id);
1614 xt_throw(self);1614
1615}1615 /* It is also the position where transactions will start writing the
16161616 * log:
1617xtPublic void xt_xres_exit(XTThreadPtr self, XTDatabaseHPtr db)1617 */
1618{1618 if (!db->db_xlog.xlog_set_write_offset(db->db_wr_log_id, db->db_wr_log_offset, max_log_id, self))
1619 db->db_restart.xres_exit(self);1619 xt_throw(self);
1620 xres_free_checkpoint_state(self, &db->db_cp_state);1620}
1621 xt_free_mutex(&db->db_cp_lock);1621
1622 xt_free_cond(&db->db_cp_cond);1622xtPublic void xt_xres_exit(XTThreadPtr self, XTDatabaseHPtr db)
1623}1623{
16241624 db->db_restart.xres_exit(self);
1625/* ----------------------------------------------------------------------1625 xres_free_checkpoint_state(self, &db->db_cp_state);
1626 * RESTART FUNCTIONALITY1626 xt_free_mutex(&db->db_cp_lock);
1627 */1627 xt_free_cond(&db->db_cp_cond);
16281628}
1629/*1629
1630 * Restart the database. This function loads the restart position, and1630/* ----------------------------------------------------------------------
1631 * applies all changes in the logs, until the end of the log, or1631 * RESTART FUNCTIONALITY
1632 * a corrupted record is found.1632 */
1633 *1633
1634 * The restart position is the position in the log where we know that1634/*
1635 * all the changes up to that point have been flushed to the1635 * Restart the database. This function loads the restart position, and
1636 * database.1636 * applies all changes in the logs, until the end of the log, or
1637 *1637 * a corrupted record is found.
1638 * This is called the checkpoint position. The checkpoint position1638 *
1639 * is written alternatively to 2 restart files.1639 * The restart position is the position in the log where we know that
1640 *1640 * all the changes up to that point have been flushed to the
1641 * To make a checkpoint:1641 * database.
1642 * Get the current log writer log offset.1642 *
1643 * For each table:1643 * This is called the checkpoint position. The checkpoint position
1644 * Get the log offset of the next operation on the table, if an1644 * is written alternatively to 2 restart files.
1645 * operation is queued for the table.1645 *
1646 * Flush that table, and the operation sequence to the table.1646 * To make a checkpoint:
1647 * For each unclean transaction:1647 * Get the current log writer log offset.
1648 * Get the log offset of the begin of the transaction.1648 * For each table:
1649 * Write the lowest of all log offsets to the restart file!1649 * Get the log offset of the next operation on the table, if an
1650 */1650 * operation is queued for the table.
16511651 * Flush that table, and the operation sequence to the table.
1652void XTXactRestart::xres_init(XTThreadPtr self, XTDatabaseHPtr db, xtLogID *log_id, xtLogOffset *log_offset, xtLogID *max_log_id)1652 * For each unclean transaction:
1653{1653 * Get the log offset of the begin of the transaction.
1654 char path[PATH_MAX];1654 * Write the lowest of all log offsets to the restart file!
1655 XTOpenFilePtr of = NULL;1655 */
1656 XTXlogCheckpointDPtr res_1_buffer = NULL;1656
1657 XTXlogCheckpointDPtr res_2_buffer = NULL;1657void XTXactRestart::xres_init(XTThreadPtr self, XTDatabaseHPtr db, xtLogID *log_id, xtLogOffset *log_offset, xtLogID *max_log_id)
1658 XTXlogCheckpointDPtr use_buffer;1658{
1659 xtLogID ind_rec_log_id = 0;1659 char path[PATH_MAX];
1660 xtLogOffset ind_rec_log_offset = 0;1660 XTOpenFilePtr of = NULL;
16611661 XTXlogCheckpointDPtr res_1_buffer = NULL;
1662 enter_();1662 XTXlogCheckpointDPtr res_2_buffer = NULL;
1663 xres_db = db;1663 XTXlogCheckpointDPtr use_buffer;
16641664 xtLogID ind_rec_log_id = 0;
1665 ASSERT(!self->st_database);1665 xtLogOffset ind_rec_log_offset = 0;
1666 /* The following call stack:1666
1667 * XTDatabaseLog::xlog_flush_pending()1667 enter_();
1668 * XTDatabaseLog::xlog_flush()1668 xres_db = db;
1669 * xt_xlog_flush_log()1669
1670 * xt_flush_indices()1670 ASSERT(!self->st_database);
1671 * idx_out_of_memory_failure()1671 /* The following call stack:
1672 * xt_idx_delete()1672 * XTDatabaseLog::xlog_flush_pending()
1673 * xres_remove_index_entries()1673 * XTDatabaseLog::xlog_flush()
1674 * xres_apply_change()1674 * xt_xlog_flush_log()
1675 * xt_xres_apply_in_order()1675 * xt_flush_indices()
1676 * XTXactRestart::xres_restart()1676 * idx_out_of_memory_failure()
1677 * XTXactRestart::xres_init()1677 * xt_idx_delete()
1678 * Leads to st_database being used!1678 * xres_remove_index_entries()
1679 */1679 * xres_apply_change()
1680 self->st_database = db;1680 * xt_xres_apply_in_order()
16811681 * XTXactRestart::xres_restart()
1682#ifdef SKIP_STARTUP_CHECKPOINT1682 * XTXactRestart::xres_init()
1683 /* When debugging, we do not checkpoint immediately, just in case1683 * Leads to st_database being used!
1684 * we detect a problem during recovery.1684 */
1685 */1685 self->st_database = db;
1686 xres_cp_required = FALSE;1686
1687#else1687#ifdef SKIP_STARTUP_CHECKPOINT
1688 xres_cp_required = TRUE;1688 /* When debugging, we do not checkpoint immediately, just in case
1689#endif1689 * we detect a problem during recovery.
1690 xres_cp_number = 0;1690 */
1691 try_(a) {1691 xres_cp_required = FALSE;
16921692#else
1693 /* Figure out which restart file to use.1693 xres_cp_required = TRUE;
1694 */1694#endif
1695 xres_name(PATH_MAX, path, 1);1695 xres_cp_number = 0;
1696 if ((of = xt_open_file(self, path, XT_FS_MISSING_OK))) {1696 try_(a) {
1697 size_t res_1_size;1697
16981698 /* Figure out which restart file to use.
1699 res_1_size = (size_t) xt_seek_eof_file(self, of);1699 */
1700 res_1_buffer = (XTXlogCheckpointDPtr) xt_malloc(self, res_1_size);1700 xres_name(PATH_MAX, path, 1);
1701 if (!xt_pread_file(of, 0, res_1_size, res_1_size, res_1_buffer, NULL, &self->st_statistics.st_x, self))1701 if ((of = xt_open_file(self, path, XT_FS_MISSING_OK))) {
1702 xt_throw(self);1702 size_t res_1_size;
1703 xt_close_file(self, of);1703
1704 of = NULL;1704 res_1_size = (size_t) xt_seek_eof_file(self, of);
1705 if (!xres_check_checksum(res_1_buffer, res_1_size)) {1705 res_1_buffer = (XTXlogCheckpointDPtr) xt_malloc(self, res_1_size);
1706 xt_free(self, res_1_buffer);1706 if (!xt_pread_file(of, 0, res_1_size, res_1_size, res_1_buffer, NULL, &self->st_statistics.st_x, self))
1707 res_1_buffer = NULL;1707 xt_throw(self);
1708 }1708 xt_close_file(self, of);
1709 }1709 of = NULL;
17101710 if (!xres_check_checksum(res_1_buffer, res_1_size)) {
1711 xres_name(PATH_MAX, path, 2);1711 xt_free(self, res_1_buffer);
1712 if ((of = xt_open_file(self, path, XT_FS_MISSING_OK))) {1712 res_1_buffer = NULL;
1713 size_t res_2_size;1713 }
17141714 }
1715 res_2_size = (size_t) xt_seek_eof_file(self, of);1715
1716 res_2_buffer = (XTXlogCheckpointDPtr) xt_malloc(self, res_2_size);1716 xres_name(PATH_MAX, path, 2);
1717 if (!xt_pread_file(of, 0, res_2_size, res_2_size, res_2_buffer, NULL, &self->st_statistics.st_x, self))1717 if ((of = xt_open_file(self, path, XT_FS_MISSING_OK))) {
1718 xt_throw(self);1718 size_t res_2_size;
1719 xt_close_file(self, of);1719
1720 of = NULL;1720 res_2_size = (size_t) xt_seek_eof_file(self, of);
1721 if (!xres_check_checksum(res_2_buffer, res_2_size)) {1721 res_2_buffer = (XTXlogCheckpointDPtr) xt_malloc(self, res_2_size);
1722 xt_free(self, res_2_buffer);1722 if (!xt_pread_file(of, 0, res_2_size, res_2_size, res_2_buffer, NULL, &self->st_statistics.st_x, self))
1723 res_2_buffer = NULL;1723 xt_throw(self);
1724 }1724 xt_close_file(self, of);
1725 }1725 of = NULL;
17261726 if (!xres_check_checksum(res_2_buffer, res_2_size)) {
1727 if (res_1_buffer && res_2_buffer) {1727 xt_free(self, res_2_buffer);
1728 if (xt_comp_log_pos(1728 res_2_buffer = NULL;
1729 XT_GET_DISK_4(res_1_buffer->xcp_log_id_4),1729 }
1730 XT_GET_DISK_6(res_1_buffer->xcp_log_offs_6),1730 }
1731 XT_GET_DISK_4(res_2_buffer->xcp_log_id_4),1731
1732 XT_GET_DISK_6(res_2_buffer->xcp_log_offs_6)) > 0) {1732 if (res_1_buffer && res_2_buffer) {
1733 /* The first log is the further along than the second: */1733 if (xt_comp_log_pos(
1734 xt_free(self, res_2_buffer);1734 XT_GET_DISK_4(res_1_buffer->xcp_log_id_4),
1735 res_2_buffer = NULL;1735 XT_GET_DISK_6(res_1_buffer->xcp_log_offs_6),
1736 }1736 XT_GET_DISK_4(res_2_buffer->xcp_log_id_4),
1737 else {1737 XT_GET_DISK_6(res_2_buffer->xcp_log_offs_6)) > 0) {
1738 if (XT_GET_DISK_6(res_1_buffer->xcp_chkpnt_no_6) >1738 /* The first log is the further along than the second: */
1739 XT_GET_DISK_6(res_2_buffer->xcp_chkpnt_no_6)) {1739 xt_free(self, res_2_buffer);
1740 xt_free(self, res_2_buffer);
1741 res_2_buffer = NULL;
1742 }
1743 else {
1744 xt_free(self, res_1_buffer);
1745 res_1_buffer = NULL;
1746 }
1747 }
1748 }
1749
1750 if (res_1_buffer) {
1751 use_buffer = res_1_buffer;
1752 xres_next_res_no = 2;
1753 }
1754 else {
1755 use_buffer = res_2_buffer;
1756 xres_next_res_no = 1;
1757 }
1758
1759 /* Read the checkpoint data: */
1760 if (use_buffer) {
1761 u_int no_of_logs;
1762 xtLogID xt_log_id;
1763 xtTableID xt_tab_id;
1764
1765 xres_cp_number = XT_GET_DISK_6(use_buffer->xcp_chkpnt_no_6);
1766 xres_cp_log_id = XT_GET_DISK_4(use_buffer->xcp_log_id_4);
1767 xres_cp_log_offset = XT_GET_DISK_6(use_buffer->xcp_log_offs_6);
1768 xt_tab_id = XT_GET_DISK_4(use_buffer->xcp_tab_id_4);
1769 if (xt_tab_id > db->db_curr_tab_id)
1770 db->db_curr_tab_id = xt_tab_id;
1771 db->db_xn_curr_id = XT_GET_DISK_4(use_buffer->xcp_xact_id_4);
1772 ind_rec_log_id = XT_GET_DISK_4(use_buffer->xcp_ind_rec_log_id_4);
1773 ind_rec_log_offset = XT_GET_DISK_6(use_buffer->xcp_ind_rec_log_offs_6);
1774 no_of_logs = XT_GET_DISK_2(use_buffer->xcp_log_count_2);
1775
1776#ifdef DEBUG_PRINT
1777 printf("CHECKPOINT log=%d offset=%d ", (int) xres_cp_log_id, (int) xres_cp_log_offset);
1778 if (no_of_logs)
1779 printf("DELETED LOGS: ");
1780#endif
1781
1782 /* Logs that are deleted are locked until _after_ the next
1783 * checkpoint.
1784 *
1785 * To prevent the following problem from occuring:
1786 * - Recovery is performed, and log X is deleted
1787 * - After delete a log is free for re-use.
1788 * New data is writen to log X.
1789 * - Server crashes.
1790 * - Recovery is performed from previous checkpoint,
1791 * and log X is deleted again.
1792 *
1793 * To lock the logs the are placed on the deleted list.
1794 * After the next checkpoint, all logs on this list
1795 * will be removed.
1796 */
1797 for (u_int i=0; i<no_of_logs; i++) {
1798 xt_log_id = (xtLogID) XT_GET_DISK_2(use_buffer->xcp_del_log[i]);
1799#ifdef DEBUG_PRINT
1800 if (i != 0)
1801 printf(", ");
1802 printf("%d", (int) xt_log_id);
1803#endif
1804#ifdef DEBUG_KEEP_LOGS
1805 xt_dl_set_to_delete(self, db, xt_log_id);
1806#else
1807 if (!xres_delete_data_log(db, xt_log_id))
1808 xt_throw(self);
1809#endif
1810 }
1811
1812#ifdef DEBUG_PRINT
1813 printf("\n");
1814#endif
1815 }
1816 else {
1817 /* Try to determine the correct start point. */
1818 xres_cp_number = 0;
1819 xres_cp_log_id = xt_xlog_get_min_log(self, db);
1820 xres_cp_log_offset = 0;
1821 ind_rec_log_id = xres_cp_log_id;
1822 ind_rec_log_offset = xres_cp_log_offset;
1823
1824#ifdef DEBUG_PRINT
1825 printf("CHECKPOINT log=1 offset=0\n");
1826#endif
1827 }
1828
1829 if (res_1_buffer) {
1830 xt_free(self, res_1_buffer);
1831 res_1_buffer = NULL;
1832 }
1833 if (res_2_buffer) {
1834 xt_free(self, res_2_buffer);
1835 res_2_buffer = NULL;
1836 }
1837
1838 if (!xres_restart(self, log_id, log_offset, ind_rec_log_id, ind_rec_log_offset, max_log_id))
1839 xt_throw(self);
1840 }
1841 catch_(a) {
1842 self->st_database = NULL;
1843 if (of)
1844 xt_close_file(self, of);
1845 if (res_1_buffer)
1846 xt_free(self, res_1_buffer);
1847 if (res_2_buffer)
1848 xt_free(self, res_2_buffer);
1849 xres_exit(self);
1850 throw_();
1851 }
1852 cont_(a);
1853 self->st_database = NULL;
1854
1855 exit_();
1856}
1857
1858void XTXactRestart::xres_exit(XTThreadPtr XT_UNUSED(self))
1859{
1860}
1861
1862void XTXactRestart::xres_name(size_t size, char *path, xtLogID log_id)
1863{
1864 char name[50];
1865
1866 sprintf(name, "restart-%lu.xt", (u_long) log_id);
1867 xt_strcpy(size, path, xres_db->db_main_path);
1868 xt_add_system_dir(size, path);
1869 xt_add_dir_char(size, path);
1870 xt_strcat(size, path, name);
1871}
1872
1873xtBool XTXactRestart::xres_check_checksum(XTXlogCheckpointDPtr buffer, size_t size)
1874{
1875 size_t head_size;
1876
1877 /* The minimum size: */
1878 if (size < offsetof(XTXlogCheckpointDRec, xcp_head_size_4) + 4)
1879 return FAILED;
1880
1881 /* Check the sizes: */
1882 head_size = XT_GET_DISK_4(buffer->xcp_head_size_4);
1883 if (size < head_size)
1884 return FAILED;
1885
1886 if (XT_GET_DISK_2(buffer->xcp_checksum_2) != xt_get_checksum(((xtWord1 *) buffer) + 2, size - 2, 1))
1887 return FAILED;
1888
1889 if (XT_GET_DISK_2(buffer->xcp_version_2) != XT_CHECKPOINT_VERSION)
1890 return FAILED;
1891
1892 return OK;
1893}
1894
1895void XTXactRestart::xres_recover_progress(XTThreadPtr self, XTOpenFilePtr *of, int perc)
1896{
1897#ifdef XT_USE_GLOBAL_DB
1898 if (!perc) {
1899 char file_path[PATH_MAX];
1900
1901 xt_strcpy(PATH_MAX, file_path, xres_db->db_main_path);
1902 xt_add_pbxt_file(PATH_MAX, file_path, "recovery-progress");
1903 *of = xt_open_file(self, file_path, XT_FS_CREATE | XT_FS_MAKE_PATH);
1904 xt_set_eof_file(self, *of, 0);
1905 }
1906
1907 if (perc > 100) {
1908 char file_path[PATH_MAX];
1909
1910 if (*of) {
1911 xt_close_file(self, *of);
1912 *of = NULL;
1913 }
1914 xt_strcpy(PATH_MAX, file_path, xres_db->db_main_path);
1915 xt_add_pbxt_file(PATH_MAX, file_path, "recovery-progress");
1916 if (xt_fs_exists(file_path))
1917 xt_fs_delete(self, file_path);
1918 }
1919 else {
1920 char number[40];
1921
1922 sprintf(number, "%d", perc);
1923 if (!xt_pwrite_file(*of, 0, strlen(number), number, &self->st_statistics.st_x, self))
1924 xt_throw(self);
1925 if (!xt_flush_file(*of, &self->st_statistics.st_x, self))
1926 xt_throw(self);
1927 }
1928#endif
1929}
1930
1931xtBool XTXactRestart::xres_restart(XTThreadPtr self, xtLogID *log_id, xtLogOffset *log_offset, xtLogID ind_rec_log_id, xtLogOffset ind_rec_log_offset, xtLogID *max_log_id)
1932{
1933 xtBool ok = TRUE;
1934 XTDatabaseHPtr db = xres_db;
1935 XTXactLogBufferDPtr record;
1936 xtXactID xn_id;
1937 XTXactDataPtr xact;
1938 xtTableID tab_id;
1939 XTWriterStateRec ws;
1940 off_t bytes_read = 0;
1941 off_t bytes_to_read;
1942 volatile xtBool print_progress = FALSE;
1943 volatile off_t perc_size = 0, next_goal = 0;
1944 int perc_complete = 1;
1945 XTOpenFilePtr progress_file = NULL;
1946 xtBool min_ram_xn_id_set = FALSE;
1947 u_int log_count;
1948
1949 memset(&ws, 0, sizeof(ws));
1950
1951 ws.ws_db = db;
1952 ws.ws_in_recover = TRUE;
1953 ws.ws_ind_rec_log_id = ind_rec_log_id;
1954 ws.ws_ind_rec_log_offset = ind_rec_log_offset;
1955
1956 /* Initialize the data log buffer (required if extended data is
1957 * referenced).
1958 * Note: this buffer is freed later. It is part of the thread
1959 * "open database" state, and this means that a thread
1960 * may not have another database open (in use) when
1961 * it calls this functions.
1962 */
1963 self->st_dlog_buf.dlb_init(db, xt_db_log_buffer_size);
1964
1965 if (!db->db_xlog.xlog_seq_init(&ws.ws_seqread, xt_db_log_buffer_size, TRUE))
1966 return FAILED;
1967
1968 bytes_to_read = xres_bytes_to_read(self, db, &log_count, max_log_id);
1969 /* Don't print anything about recovering an empty database: */
1970 if (bytes_to_read != 0)
1971 xt_logf(XT_NT_INFO, "PBXT: Recovering from %lu-%llu, bytes to read: %llu\n", (u_long) xres_cp_log_id, (u_llong) xres_cp_log_offset, (u_llong) bytes_to_read);
1972 if (bytes_to_read >= 10*1024*1024) {
1973 print_progress = TRUE;
1974 perc_size = bytes_to_read / 100;
1975 next_goal = perc_size;
1976 xres_recover_progress(self, &progress_file, 0);
1977 }
1978
1979 if (!db->db_xlog.xlog_seq_start(&ws.ws_seqread, xres_cp_log_id, xres_cp_log_offset, FALSE)) {
1980 ok = FALSE;
1981 goto failed;
1982 }
1983
1984 try_(a) {
1985 for (;;) {
1986 if (!db->db_xlog.xlog_seq_next(&ws.ws_seqread, &record, TRUE, self)) {
1987 ok = FALSE;
1988 break;
1989 }
1990 /* Increment before. If record is NULL then xseq_record_len will be zero,
1991 * UNLESS the last record was of type XT_LOG_ENT_END_OF_LOG
1992 * which fills the log to align to block of size 512.
1993 */
1994 bytes_read += ws.ws_seqread.xseq_record_len;
1995 if (!record)
1996 break;
1997#ifdef PRINT_LOG_ON_RECOVERY
1998 xt_print_log_record(ws.ws_seqread.xseq_rec_log_id, ws.ws_seqread.xseq_rec_log_offset, record);
1999#endif
2000 if (print_progress && bytes_read > next_goal) {
2001 if (((perc_complete - 1) % 25) == 0)
2002 xt_logf(XT_NT_INFO, "PBXT: ");
2003 if ((perc_complete % 25) == 0)
2004 xt_logf(XT_NT_INFO, "%2d\n", (int) perc_complete);
2005 else
2006 xt_logf(XT_NT_INFO, "%2d ", (int) perc_complete);
2007 xt_log_flush(self);
2008 xres_recover_progress(self, &progress_file, perc_complete);
2009 next_goal += perc_size;
2010 perc_complete++;
2011 }
2012 switch (record->xl.xl_status_1) {
2013 case XT_LOG_ENT_HEADER:
2014 break;
2015 case XT_LOG_ENT_NEW_LOG: {
2016 /* Adjust the bytes read for the fact that logs are written
2017 * on 512 byte boundaries.
2018 */
2019 off_t offs, eof = ws.ws_seqread.xseq_log_eof;
2020
2021 offs = ws.ws_seqread.xseq_rec_log_offset + ws.ws_seqread.xseq_record_len;
2022 if (eof > offs)
2023 bytes_read += eof - offs;
2024 if (!db->db_xlog.xlog_seq_start(&ws.ws_seqread, XT_GET_DISK_4(record->xl.xl_log_id_4), 0, TRUE))
2025 xt_throw(self);
2026 break;
2027 }
2028 case XT_LOG_ENT_NEW_TAB:
2029 tab_id = XT_GET_DISK_4(record->xt.xt_tab_id_4);
2030 if (tab_id > db->db_curr_tab_id)
2031 db->db_curr_tab_id = tab_id;
2032 break;
2033 case XT_LOG_ENT_UPDATE_BG:
2034 case XT_LOG_ENT_INSERT_BG:
2035 case XT_LOG_ENT_DELETE_BG:
2036 xn_id = XT_GET_DISK_4(record->xu.xu_xact_id_4);
2037 goto start_xact;
2038 case XT_LOG_ENT_UPDATE_FL_BG:
2039 case XT_LOG_ENT_INSERT_FL_BG:
2040 case XT_LOG_ENT_DELETE_FL_BG:
2041 xn_id = XT_GET_DISK_4(record->xf.xf_xact_id_4);
2042 start_xact:
2043 if (xt_xn_is_before(db->db_xn_curr_id, xn_id))
2044 db->db_xn_curr_id = xn_id;
2045
2046 if (!(xact = xt_xn_add_old_xact(db, xn_id, self)))
2047 xt_throw(self);
2048
2049 xact->xd_begin_log = ws.ws_seqread.xseq_rec_log_id;
2050 xact->xd_begin_offset = ws.ws_seqread.xseq_rec_log_offset;
2051
2052 xact->xd_end_xn_id = xn_id;
2053 xact->xd_end_time = db->db_xn_end_time;
2054 xact->xd_flags = (XT_XN_XAC_LOGGED | XT_XN_XAC_ENDED | XT_XN_XAC_RECOVERED | XT_XN_XAC_SWEEP);
2055
2056 /* This may affect the "minimum RAM transaction": */
2057 if (!min_ram_xn_id_set || xt_xn_is_before(xn_id, db->db_xn_min_ram_id)) {
2058 min_ram_xn_id_set = TRUE;
2059 db->db_xn_min_ram_id = xn_id;
2060 }
2061 xt_xres_apply_in_order(self, &ws, ws.ws_seqread.xseq_rec_log_id, ws.ws_seqread.xseq_rec_log_offset, record);
2062 break;
2063 case XT_LOG_ENT_COMMIT:
2064 case XT_LOG_ENT_ABORT:
2065 xn_id = XT_GET_DISK_4(record->xe.xe_xact_id_4);
2066 if ((xact = xt_xn_get_xact(db, xn_id, self))) {
2067 xact->xd_end_xn_id = xn_id;
2068 xact->xd_flags |= XT_XN_XAC_ENDED | XT_XN_XAC_SWEEP;
2069 xact->xd_flags &= ~XT_XN_XAC_RECOVERED; // We can expect an end record on cleanup!
2070 if (record->xl.xl_status_1 == XT_LOG_ENT_COMMIT)
2071 xact->xd_flags |= XT_XN_XAC_COMMITTED;
2072 }
2073 break;
2074 case XT_LOG_ENT_CLEANUP:
2075 /* The transaction was cleaned up: */
2076 xn_id = XT_GET_DISK_4(record->xc.xc_xact_id_4);
2077 xt_xn_delete_xact(db, xn_id, self);
2078 break;
2079 case XT_LOG_ENT_OP_SYNC:
2080 xres_sync_operations(self, db, &ws);
2081 break;
2082 case XT_LOG_ENT_DEL_LOG:
2083 xtLogID rec_log_id;
2084
2085 rec_log_id = XT_GET_DISK_4(record->xl.xl_log_id_4);
2086 xt_dl_set_to_delete(self, db, rec_log_id);
2087 break;
2088 default:
2089 xt_xres_apply_in_order(self, &ws, ws.ws_seqread.xseq_rec_log_id, ws.ws_seqread.xseq_rec_log_offset, record);
2090 break;
2091 }
2092 }
2093
2094 if (xres_sync_operations(self, db, &ws)) {
2095 XTactOpSyncEntryDRec op_sync;
2096 time_t now = time(NULL);
2097
2098 op_sync.os_status_1 = XT_LOG_ENT_OP_SYNC;
2099 op_sync.os_checksum_1 = XT_CHECKSUM_1(now) ^ XT_CHECKSUM_1(ws.ws_seqread.xseq_rec_log_id);
2100 XT_SET_DISK_4(op_sync.os_time_4, (xtWord4) now);
2101 /* TODO: If this is done, check to see that
2102 * the byte written here are read back by the writter.
2103 * This is in order to be in sync with 'xl_log_bytes_written'.
2104 * i.e. xl_log_bytes_written == xl_log_bytes_read
2105 */
2106 if (!db->db_xlog.xlog_write_thru(&ws.ws_seqread, sizeof(XTactOpSyncEntryDRec), (xtWord1 *) &op_sync, self))
2107 xt_throw(self);
2108 }
2109 }
2110 catch_(a) {
2111 ok = FALSE;
2112 }
2113 cont_(a);
2114
2115 if (ok) {
2116 if (print_progress) {
2117 while (perc_complete <= 100) {
2118 if (((perc_complete - 1) % 25) == 0)
2119 xt_logf(XT_NT_INFO, "PBXT: ");
2120 if ((perc_complete % 25) == 0)
2121 xt_logf(XT_NT_INFO, "%2d\n", (int) perc_complete);
2122 else
2123 xt_logf(XT_NT_INFO, "%2d ", (int) perc_complete);
2124 xt_log_flush(self);
2125 xres_recover_progress(self, &progress_file, perc_complete);
2126 perc_complete++;
2127 }
2128 }
2129 if (bytes_to_read != 0)
2130 xt_logf(XT_NT_INFO, "PBXT: Recovering complete at %lu-%llu, bytes read: %llu\n", (u_long) ws.ws_seqread.xseq_rec_log_id, (u_llong) ws.ws_seqread.xseq_rec_log_offset, (u_llong) bytes_read);
2131
2132 *log_id = ws.ws_seqread.xseq_rec_log_id;
2133 *log_offset = ws.ws_seqread.xseq_rec_log_offset;
2134
2135 if (!min_ram_xn_id_set)
2136 /* This is true because if no transaction was placed in RAM then
2137 * the next transaction in RAM will have the next ID: */
2138 db->db_xn_min_ram_id = db->db_xn_curr_id + 1;
2139 }
2140
2141 failed:
2142 xt_free_writer_state(self, &ws);
2143 self->st_dlog_buf.dlb_exit(self);
2144 xres_recover_progress(self, &progress_file, 101);
2145 return ok;
2146}
2147
2148xtBool XTXactRestart::xres_is_checkpoint_pending(xtLogID curr_log_id, xtLogOffset curr_log_offset)
2149{
2150 return xt_bytes_since_last_checkpoint(xres_db, curr_log_id, curr_log_offset) >= xt_db_checkpoint_frequency / 2;
2151}
2152
2153/*
2154 * Calculate the bytes to be read for recovery.
2155 * This is only an estimate of the number of bytes that
2156 * will be read.
2157 */
2158off_t XTXactRestart::xres_bytes_to_read(XTThreadPtr self, XTDatabaseHPtr db, u_int *log_count, xtLogID *max_log_id)
2159{
2160 off_t to_read = 0, eof;
2161 xtLogID log_id = xres_cp_log_id;
2162 char log_path[PATH_MAX];
2163 XTOpenFilePtr of;
2164 XTXactLogHeaderDRec log_head;
2165 size_t head_size;
2166 size_t red_size;
2167
2168 *max_log_id = log_id;
2169 *log_count = 0;
2170 for (;;) {
2171 db->db_xlog.xlog_name(PATH_MAX, log_path, log_id);
2172 of = NULL;
2173 if (!xt_open_file_ns(&of, log_path, XT_FS_MISSING_OK))
2174 xt_throw(self);
2175 if (!of)
2176 break;
2177 pushr_(xt_close_file, of);
2178
2179 /* Check the first record of the log, to see if it is valid. */
2180 if (!xt_pread_file(of, 0, sizeof(XTXactLogHeaderDRec), 0, (xtWord1 *) &log_head, &red_size, &self->st_statistics.st_xlog, self))
2181 xt_throw(self);
2182 /* The minimum size (old log size): */
2183 if (red_size < XT_MIN_LOG_HEAD_SIZE)
2184 goto done;
2185 head_size = XT_GET_DISK_4(log_head.xh_size_4);
2186 if (log_head.xh_status_1 != XT_LOG_ENT_HEADER)
2187 goto done;
2188 if (log_head.xh_checksum_1 != XT_CHECKSUM_1(log_id))
2189 goto done;
2190 if (XT_LOG_HEAD_MAGIC(&log_head, head_size) != XT_LOG_FILE_MAGIC)
2191 goto done;
2192 if (head_size > offsetof(XTXactLogHeaderDRec, xh_log_id_4) + 4) {
2193 if (XT_GET_DISK_4(log_head.xh_log_id_4) != log_id)
2194 goto done;
2195 }
2196 if (head_size > offsetof(XTXactLogHeaderDRec, xh_version_2) + 4) {
2197 if (XT_GET_DISK_2(log_head.xh_version_2) > XT_LOG_VERSION_NO)
2198 xt_throw_ulxterr(XT_CONTEXT, XT_ERR_NEW_TYPE_OF_XLOG, (u_long) log_id);
2199 }
2200
2201 eof = xt_seek_eof_file(self, of);
2202 freer_(); // xt_close_file(of)
2203 if (log_id == xres_cp_log_id)
2204 to_read += (eof - xres_cp_log_offset);
2205 else
2206 to_read += eof;
2207 (*log_count)++;
2208 *max_log_id = log_id;
2209 log_id++;
2210 }
2211 return to_read;
2212
2213 done:
2214 freer_(); // xt_close_file(of)
2215 return to_read;
2216}
2217
2218
2219/* ----------------------------------------------------------------------
2220 * C H E C K P O I N T P R O C E S S
2221 */
2222
2223typedef enum XTFileType {
2224 XT_FT_RECROW_FILE,
2225 XT_FT_INDEX_FILE
2226} XTFileType;
2227
2228typedef struct XTDirtyFile {
2229 xtTableID df_tab_id;
2230 XTFileType df_file_type;
2231} XTDirtyFileRec, *XTDirtyFilePtr;
2232
2233#define XT_MAX_FLUSH_FILES 200
2234#define XT_FLUSH_THRESHOLD (2 * 1024 * 1024)
2235
2236/* Sort files to be flused. */
2237#ifdef USE_LATER
2238static void xres_cp_flush_files(XTThreadPtr self, XTDatabaseHPtr db)
2239{
2240 u_int edx;
2241 XTTableEntryPtr te;
2242 XTDirtyFileRec flush_list[XT_MAX_FLUSH_FILES];
2243 u_int file_count = 0;
2244 XTIndexPtr *iptr;
2245 u_int dirty_blocks;
2246 XTOpenTablePtr ot;
2247 XTTableHPtr tab;
2248
2249 retry:
2250 xt_enum_tables_init(&edx);
2251 xt_ht_lock(self, db->db_tables);
2252 pushr_(xt_ht_unlock, db->db_tables);
2253 while (file_count < XT_MAX_FLUSH_FILES &&
2254 (te = xt_enum_tables_next(self, db, &edx))) {
2255 if ((tab = te->te_table)) {
2256 if (tab->tab_bytes_to_flush >= XT_FLUSH_THRESHOLD) {
2257 flush_list[file_count].df_tab_id = te->te_tab_id;
2258 flush_list[file_count].df_file_type = XT_FT_RECROW_FILE;
2259 file_count++;
2260 }
2261 if (file_count == XT_MAX_FLUSH_FILES)
2262 break;
2263 iptr = tab->tab_dic.dic_keys;
2264 dirty_blocks = 0;
2265 for (u_int i=0;i<tab->tab_dic.dic_key_count; i++) {
2266 dirty_blocks += (*iptr)->mi_dirty_blocks;
2267 iptr++;
2268 }
2269 if ((dirty_blocks * XT_INDEX_PAGE_SIZE) >= XT_FLUSH_THRESHOLD) {
2270 flush_list[file_count].df_tab_id = te->te_tab_id;
2271 flush_list[file_count].df_file_type = XT_FT_INDEX_FILE;
2272 file_count++;
2273 }
2274 }
2275 }
2276 freer_(); // xt_ht_unlock(db->db_tables)
2277
2278 for (u_int i=0;i<file_count && !self->t_quit; i++) {
2279 /* We want to flush about once a second: */
2280 xt_sleep_milli_second(400);
2281 if ((ot = xt_db_open_pool_table(self, db, flush_list[i].df_tab_id, NULL, TRUE))) {
2282 pushr_(xt_db_return_table_to_pool, ot);
2283
2284 if (flush_list[i].df_file_type == XT_FT_RECROW_FILE) {
2285 if (!xt_flush_record_row(ot, NULL))
2286 xt_throw(self);
2287 }
2288 else {
2289 if (!xt_flush_indices(ot, NULL))
2290 xt_throw(self);
2291 }
2292
2293 freer_(); // xt_db_return_table_to_pool(ot)
2294 }
2295 }
2296
2297 if (file_count == 100)
2298 goto retry;
2299}
2300#endif
2301
2302#ifdef xxx
2303void XTXactRestart::xres_checkpoint_pending(xtLogID log_id, xtLogOffset log_offset)
2304{
2305#ifdef TRACE_CHECKPOINT_ACTIVITY
2306 xtBool tmp = xres_cp_pending;
2307#endif
2308 xres_cp_pending = xres_is_checkpoint_pending(log_id, log_offset);
2309#ifdef TRACE_CHECKPOINT_ACTIVITY
2310 if (tmp) {
2311 if (!xres_cp_pending)
2312 printf("%s xres_cp_pending = FALSE\n", xt_get_self()->t_name);
2313 }
2314 else {
2315 if (xres_cp_pending)
2316 printf("%s xres_cp_pending = TRUE\n", xt_get_self()->t_name);
2317 }
2318#endif
2319}
2320
2321
2322 xres_checkpoint_pending();
2323
2324 if (!xres_cp_required &&
2325 !xres_cp_pending &&
2326 xt_sl_get_size(db->db_datalogs.dlc_to_delete) == 0 &&
2327 xt_sl_get_size(db->db_datalogs.dlc_deleted) == 0)
2328 return FALSE;
2329#endif
2330
2331#ifdef NEVER_CHECKPOINT
2332xtBool no_checkpoint = TRUE;
2333#endif
2334
2335#define XT_CHECKPOINT_IF_NO_ACTIVITY 0
2336#define XT_CHECKPOINT_PAUSE_IF_ACTIVITY 1
2337#define XT_CHECKPOINT_NO_PAUSE 2
2338
2339/*
2340 * This function performs table flush, as long as the system is idle.
2341 */
2342static xtBool xres_cp_checkpoint(XTThreadPtr self, XTDatabaseHPtr db, u_int curr_writer_total, xtBool force_checkpoint)
2343{
2344 XTCheckPointStatePtr cp = &db->db_cp_state;
2345 XTOpenTablePtr ot;
2346 XTCheckPointTablePtr to_flush_ptr;
2347 XTCheckPointTableRec to_flush;
2348 u_int table_count = 0;
2349 xtBool checkpoint_done;
2350 off_t bytes_flushed = 0;
2351 int check_type;
2352
2353#ifdef NEVER_CHECKPOINT
2354 if (no_checkpoint)
2355 return FALSE;
2356#endif
2357 if (force_checkpoint) {
2358 if (db->db_restart.xres_cp_required)
2359 check_type = XT_CHECKPOINT_NO_PAUSE;
2360 else
2361 check_type = XT_CHECKPOINT_PAUSE_IF_ACTIVITY;
2362 }
2363 else
2364 check_type = XT_CHECKPOINT_IF_NO_ACTIVITY;
2365
2366 to_flush.cpt_tab_id = 0;
2367 to_flush.cpt_flushed = 0;
2368
2369 /* Start a checkpoint: */
2370 if (!xt_begin_checkpoint(db, FALSE, self))
2371 xt_throw(self);
2372
2373 while (!self->t_quit) {
2374 xt_lock_mutex_ns(&cp->cp_state_lock);
2375 table_count = 0;
2376 if (cp->cp_table_ids)
2377 table_count = xt_sl_get_size(cp->cp_table_ids);
2378 if (!cp->cp_running || cp->cp_flush_count >= table_count) {
2379 xt_unlock_mutex_ns(&cp->cp_state_lock);
2380 break;
2381 }
2382 if (cp->cp_next_to_flush > table_count)
2383 cp->cp_next_to_flush = 0;
2384
2385 to_flush_ptr = (XTCheckPointTablePtr) xt_sl_item_at(cp->cp_table_ids, cp->cp_next_to_flush);
2386 if (to_flush_ptr)
2387 to_flush = *to_flush_ptr;
2388 xt_unlock_mutex_ns(&cp->cp_state_lock);
2389
2390 if (to_flush_ptr) {
2391 if ((ot = xt_db_open_pool_table(self, db, to_flush.cpt_tab_id, NULL, TRUE))) {
2392 pushr_(xt_db_return_table_to_pool, ot);
2393
2394 if (!(to_flush.cpt_flushed & XT_CPT_REC_ROW_FLUSHED)) {
2395 if (!xt_flush_record_row(ot, &bytes_flushed, FALSE))
2396 xt_throw(self);
2397 }
2398
2399 xt_lock_mutex_ns(&cp->cp_state_lock);
2400 to_flush_ptr = NULL;
2401 if (cp->cp_running)
2402 to_flush_ptr = (XTCheckPointTablePtr) xt_sl_item_at(cp->cp_table_ids, cp->cp_next_to_flush);
2403 if (to_flush_ptr)
2404 to_flush = *to_flush_ptr;
2405 xt_unlock_mutex_ns(&cp->cp_state_lock);
2406
2407 if (to_flush_ptr && !self->t_quit) {
2408 if (!(to_flush.cpt_flushed & XT_CPT_INDEX_FLUSHED)) {
2409 switch (check_type) {
2410 case XT_CHECKPOINT_IF_NO_ACTIVITY:
2411 if (bytes_flushed > 0 && curr_writer_total != db->db_xn_total_writer_count) {
2412 freer_(); // xt_db_return_table_to_pool(ot)
2413 goto end_checkpoint;
2414 }
2415 break;
2416 case XT_CHECKPOINT_PAUSE_IF_ACTIVITY:
2417 if (bytes_flushed > 2 * 1024 * 1024 && curr_writer_total != db->db_xn_total_writer_count) {
2418 curr_writer_total = db->db_xn_total_writer_count;
2419 bytes_flushed = 0;
2420 xt_sleep_milli_second(400);
2421 }
2422 break;
2423 case XT_CHECKPOINT_NO_PAUSE:
2424 break;
2425 }
2426
2427 if (!self->t_quit) {
2428 if (!xt_flush_indices(ot, &bytes_flushed, FALSE))
2429 xt_throw(self);
2430 to_flush.cpt_flushed |= XT_CPT_INDEX_FLUSHED;
2431 }
2432 }
2433 }
2434
2435 freer_(); // xt_db_return_table_to_pool(ot)
2436 }
2437
2438 if ((to_flush.cpt_flushed & XT_CPT_ALL_FLUSHED) == XT_CPT_ALL_FLUSHED)
2439 cp->cp_next_to_flush++;
2440 }
2441 else
2442 cp->cp_next_to_flush++;
2443
2444 if (self->t_quit)
2445 break;
2446
2447 switch (check_type) {
2448 case XT_CHECKPOINT_IF_NO_ACTIVITY:
2449 if (bytes_flushed > 0 && curr_writer_total != db->db_xn_total_writer_count)
2450 goto end_checkpoint;
2451 break;
2452 case XT_CHECKPOINT_PAUSE_IF_ACTIVITY:
2453 if (bytes_flushed > 2 * 1024 * 1024 && curr_writer_total != db->db_xn_total_writer_count) {
2454 curr_writer_total = db->db_xn_total_writer_count;
2455 bytes_flushed = 0;
2456 xt_sleep_milli_second(400);
2457 }
2458 break;
2459 case XT_CHECKPOINT_NO_PAUSE:
2460 break;
2461 }
2462 }
2463
2464 end_checkpoint:
2465 if (!xt_end_checkpoint(db, self, &checkpoint_done))
2466 xt_throw(self);
2467 return checkpoint_done;
2468}
2469
2470
2471/* Wait for the log writer to tell us to do something.
2472 */
2473static void xres_cp_wait_for_log_writer(XTThreadPtr self, XTDatabaseHPtr db, u_long milli_secs)
2474{
2475 xt_lock_mutex(self, &db->db_cp_lock);
2476 pushr_(xt_unlock_mutex, &db->db_cp_lock);
2477 if (!self->t_quit)
2478 xt_timed_wait_cond(self, &db->db_cp_cond, &db->db_cp_lock, milli_secs);
2479 freer_(); // xt_unlock_mutex(&db->db_cp_lock)
2480}
2481
2482/*
2483 * This is the way checkpoint works:
2484 *
2485 * To write a checkpoint we need to flush all tables in
2486 * the database.
2487 *
2488 * Before flushing the first table we get the checkpoint
2489 * log position.
2490 *
2491 * After flushing all files we write of the checkpoint
2492 * log position.
2493 */
2494static void xres_cp_main(XTThreadPtr self)
2495{
2496 XTDatabaseHPtr db = self->st_database;
2497 u_int curr_writer_total;
2498 time_t now;
2499
2500 xt_set_low_priority(self);
2501
2502
2503 while (!self->t_quit) {
2504 /* Wait 2 seconds: */
2505 curr_writer_total = db->db_xn_total_writer_count;
2506 xt_db_approximate_time = time(NULL);
2507 now = xt_db_approximate_time;
2508 while (!self->t_quit && xt_db_approximate_time < now + 2 && !db->db_restart.xres_cp_required) {
2509 xres_cp_wait_for_log_writer(self, db, 400);
2510 xt_db_approximate_time = time(NULL);
2511 xt_db_free_unused_open_tables(self, db);
2512 }
2513
2514 if (self->t_quit)
2515 break;
2516
2517 if (curr_writer_total == db->db_xn_total_writer_count)
2518 /* No activity in 2 seconds: */
2519 xres_cp_checkpoint(self, db, curr_writer_total, FALSE);
2520 else {
2521 /* There server is busy, check if we need to
2522 * write a checkpoint anyway...
2523 */
2524 if (db->db_restart.xres_cp_required ||
2525 db->db_restart.xres_is_checkpoint_pending(db->db_xlog.xl_write_log_id, db->db_xlog.xl_write_log_offset)) {
2526 /* Flush tables, until the checkpoint is complete. */
2527 xres_cp_checkpoint(self, db, curr_writer_total, TRUE);
2528 }
2529 }
2530
2531 if (curr_writer_total == db->db_xn_total_writer_count) {
2532 /* We did a checkpoint, and still, nothing has
2533 * happened....
2534 *
2535 * Wait for something to happen:
2536 */
2537 xtLogID log_id;
2538 xtLogOffset log_offset;
2539
2540 while (!self->t_quit && curr_writer_total == db->db_xn_total_writer_count) {
2541 /* The writer position: */
2542 xt_lock_mutex(self, &db->db_wr_lock);
2543 pushr_(xt_unlock_mutex, &db->db_wr_lock);
2544 log_id = db->db_wr_log_id;
2545 log_offset = db->db_wr_log_offset;
2546 freer_(); // xt_unlock_mutex(&db->db_wr_lock)
2547
2548 /* This condition means we could checkpoint: */
2549 if (!(xt_sl_get_size(db->db_datalogs.dlc_to_delete) == 0 &&
2550 xt_sl_get_size(db->db_datalogs.dlc_deleted) == 0 &&
2551 xt_comp_log_pos(log_id, log_offset, db->db_restart.xres_cp_log_id, db->db_restart.xres_cp_log_offset) <= 0))
2552 break;
2553
2554 xres_cp_wait_for_log_writer(self, db, 400);
2555 xt_db_approximate_time = time(NULL);
2556 xt_db_free_unused_open_tables(self, db);
2557 }
2558 }
2559 }
2560}
2561
2562static void *xres_cp_run_thread(XTThreadPtr self)
2563{
2564 XTDatabaseHPtr db = (XTDatabaseHPtr) self->t_data;
2565 int count;
2566 void *mysql_thread;
2567
2568 mysql_thread = myxt_create_thread();
2569
2570 while (!self->t_quit) {
2571 try_(a) {
2572 /*
2573 * The garbage collector requires that the database
2574 * is in use because.
2575 */
2576 xt_use_database(self, db, XT_FOR_CHECKPOINTER);
2577
2578 /* This action is both safe and required (see details elsewhere) */
2579 xt_heap_release(self, self->st_database);
2580
2581 xres_cp_main(self);
2582 }
2583 catch_(a) {
2584 /* This error is "normal"! */
2585 if (self->t_exception.e_xt_err != XT_ERR_NO_DICTIONARY &&
2586 !(self->t_exception.e_xt_err == XT_SIGNAL_CAUGHT &&
2587 self->t_exception.e_sys_err == SIGTERM))
2588 xt_log_and_clear_exception(self);
2589 }
2590 cont_(a);
2591
2592 /* Avoid releasing the database (done above) */
2593 self->st_database = NULL;
2594 xt_unuse_database(self, self);
2595
2596 /* After an exception, pause before trying again... */
2597 /* Number of seconds */
2598 count = 60;
2599 while (!self->t_quit && count > 0) {
2600 sleep(1);
2601 count--;
2602 }
2603 }
2604
2605 myxt_destroy_thread(mysql_thread, TRUE);
2606 return NULL;
2607}
2608
2609static void xres_cp_free_thread(XTThreadPtr self, void *data)
2610{
2611 XTDatabaseHPtr db = (XTDatabaseHPtr) data;
2612
2613 if (db->db_cp_thread) {
2614 xt_lock_mutex(self, &db->db_cp_lock);
2615 pushr_(xt_unlock_mutex, &db->db_cp_lock);
2616 db->db_cp_thread = NULL;
2617 freer_(); // xt_unlock_mutex(&db->db_cp_lock)
2618 }
2619}
2620
2621/* Start a checkpoint, if none has been started. */
2622xtPublic xtBool xt_begin_checkpoint(XTDatabaseHPtr db, xtBool have_table_lock, XTThreadPtr thread)
2623{
2624 XTCheckPointStatePtr cp = &db->db_cp_state;
2625 xtLogID log_id;
2626 xtLogOffset log_offset;
2627 xtLogID ind_rec_log_id;
2628 xtLogOffset ind_rec_log_offset;
2629 u_int edx;
2630 XTTableEntryPtr te_ptr;
2631 XTTableHPtr tab;
2632 XTOperationPtr op;
2633 XTCheckPointTableRec cpt;
2634 XTSortedListPtr tables = NULL;
2635
2636 /* First check if a checkpoint is already running: */
2637 xt_lock_mutex_ns(&cp->cp_state_lock);
2638 if (cp->cp_running) {
2639 xt_unlock_mutex_ns(&cp->cp_state_lock);
2640 return OK;
2641 }
2642 if (cp->cp_table_ids) {
2643 xt_free_sortedlist(NULL, cp->cp_table_ids);
2644 cp->cp_table_ids = NULL;
2645 }
2646 xt_unlock_mutex_ns(&cp->cp_state_lock);
2647
2648 /* Flush the log before we continue. This is to ensure that
2649 * before we write a checkpoint, that the changes
2650 * done by the sweeper and the compactor, have been
2651 * applied.
2652 *
2653 * Note, the sweeper does not flush the log, so this is
2654 * necessary!
2655 *
2656 * --- I have removed this flush. It is actually just a
2657 * minor optimisation, which pushes the flush position
2658 * below ahead.
2659 *
2660 * Note that the writer position used for the checkpoint
2661 * _will_ be behind the current log flush position.
2662 *
2663 * This is because the writer cannot apply log changes
2664 * until they are flushed.
2665 */
2666 /* This is an alternative to the above.
2667 if (!xt_xlog_flush_log(self))
2668 xt_throw(self);
2669 */
2670 xt_lock_mutex_ns(&db->db_wr_lock);
2671
2672 /* The theoretical maximum restart log postion, is the
2673 * position of the writer thread:
2674 */
2675 log_id = db->db_wr_log_id;
2676 log_offset = db->db_wr_log_offset;
2677
2678 ind_rec_log_id = db->db_xlog.xl_flush_log_id;
2679 ind_rec_log_offset = db->db_xlog.xl_flush_log_offset;
2680
2681 xt_unlock_mutex_ns(&db->db_wr_lock);
2682
2683 /* Go through all the transactions, and find
2684 * the lowest log start position of all the transactions.
2685 */
2686 for (u_int i=0; i<XT_XN_NO_OF_SEGMENTS; i++) {
2687 XTXactSegPtr seg;
2688
2689 seg = &db->db_xn_idx[i];
2690 XT_XACT_READ_LOCK(&seg->xs_tab_lock, self);
2691 for (u_int j=0; j<XT_XN_HASH_TABLE_SIZE; j++) {
2692 XTXactDataPtr xact;
2693
2694 xact = seg->xs_table[j];
2695 while (xact) {
2696 /* If the transaction is logged, but not cleaned: */
2697 if ((xact->xd_flags & (XT_XN_XAC_LOGGED | XT_XN_XAC_CLEANED)) == XT_XN_XAC_LOGGED) {
2698 if (xt_comp_log_pos(log_id, log_offset, xact->xd_begin_log, xact->xd_begin_offset) > 0) {
2699 log_id = xact->xd_begin_log;
2700 log_offset = xact->xd_begin_offset;
2701 }
2702 }
2703 xact = xact->xd_next_xact;
2704 }
2705 }
2706 XT_XACT_UNLOCK(&seg->xs_tab_lock, self, FALSE);
2707 }
2708
2709#ifdef TRACE_CHECKPOINT
2710 printf("BEGIN CHECKPOINT %d-%llu\n", (int) log_id, (u_llong) log_offset);
2711#endif
2712 /* Go through all tables, and find the lowest log position.
2713 * The log position stored by each table shows the position of
2714 * the next operation that still needs to be applied.
2715 *
2716 * This comes from the list of operations which are
2717 * queued for the table.
2718 *
2719 * This function also builds a list of tables!
2720 */
2721
2722 if (!(tables = xt_new_sortedlist_ns(sizeof(XTCheckPointTableRec), 20, xres_comp_flush_tabs, NULL, NULL)))
2723 return FAILED;
2724
2725 xt_enum_tables_init(&edx);
2726 if (!have_table_lock)
2727 xt_ht_lock(NULL, db->db_tables);
2728 while ((te_ptr = xt_enum_tables_next(NULL, db, &edx))) {
2729 if ((tab = te_ptr->te_table)) {
2730 xt_sl_lock_ns(tab->tab_op_list, thread);
2731 if ((op = (XTOperationPtr) xt_sl_first_item(tab->tab_op_list))) {
2732 if (xt_comp_log_pos(log_id, log_offset, op->or_log_id, op->or_log_offset) > 0) {
2733 log_id = op->or_log_id;
2734 log_offset = op->or_log_offset;
2735 }
2736 }
2737 xt_sl_unlock(NULL, tab->tab_op_list);
2738 cpt.cpt_flushed = 0;
2739 cpt.cpt_tab_id = tab->tab_id;
2740#ifdef TRACE_CHECKPOINT
2741 printf("to flush: %d %s\n", (int) tab->tab_id, tab->tab_name->ps_path);
2742#endif
2743 if (!xt_sl_insert(NULL, tables, &tab->tab_id, &cpt)) {
2744 if (!have_table_lock)
2745 xt_ht_unlock(NULL, db->db_tables);
2746 xt_free_sortedlist(NULL, tables);
2747 return FAILED;
2748 }
2749 }
2750 }
2751 if (!have_table_lock)
2752 xt_ht_unlock(NULL, db->db_tables);
2753
2754 xt_lock_mutex_ns(&cp->cp_state_lock);
2755 /* If there is a table list, then someone was faster than me! */
2756 if (!cp->cp_running && log_id && log_offset) {
2757 cp->cp_running = TRUE;
2758 cp->cp_log_id = log_id;
2759 cp->cp_log_offset = log_offset;
2760
2761 cp->cp_ind_rec_log_id = ind_rec_log_id;
2762 cp->cp_ind_rec_log_offset = ind_rec_log_offset;
2763
2764 cp->cp_flush_count = 0;
2765 cp->cp_next_to_flush = 0;
2766 cp->cp_table_ids = tables;
2767 }
2768 else
2769 xt_free_sortedlist(NULL, tables);
2770 xt_unlock_mutex_ns(&cp->cp_state_lock);
2771
2772 /* At this point, log flushing can begin... */
2773 return OK;
2774}
2775
2776/* End a checkpoint, if a checkpoint has been started,
2777 * and all checkpoint tables have been flushed
2778 */
2779xtPublic xtBool xt_end_checkpoint(XTDatabaseHPtr db, XTThreadPtr thread, xtBool *checkpoint_done)
2780{
2781 XTCheckPointStatePtr cp = &db->db_cp_state;
2782 XTXlogCheckpointDPtr cp_buf = NULL;
2783 char path[PATH_MAX];
2784 XTOpenFilePtr of;
2785 u_int table_count;
2786 size_t chk_size = 0;
2787 u_int no_of_logs = 0;
2788
2789#ifdef NEVER_CHECKPOINT
2790 return OK;
2791#endif
2792 /* Lock the checkpoint state so that only on thread can do this! */
2793 xt_lock_mutex_ns(&cp->cp_state_lock);
2794 if (!cp->cp_running)
2795 goto checkpoint_done;
2796
2797 table_count = 0;
2798 if (cp->cp_table_ids)
2799 table_count = xt_sl_get_size(cp->cp_table_ids);
2800 if (cp->cp_flush_count < table_count) {
2801 /* Checkpoint is not done, yet! */
2802 xt_unlock_mutex_ns(&cp->cp_state_lock);
2803 if (checkpoint_done)
2804 *checkpoint_done = FALSE;
2805 return OK;
2806 }
2807
2808 /* Check if anything has changed since the last checkpoint,
2809 * if not, there is no need to write a new checkpoint!
2810 */
2811 if (xt_sl_get_size(db->db_datalogs.dlc_to_delete) == 0 &&
2812 xt_sl_get_size(db->db_datalogs.dlc_deleted) == 0 &&
2813 xt_comp_log_pos(cp->cp_log_id, cp->cp_log_offset, db->db_restart.xres_cp_log_id, db->db_restart.xres_cp_log_offset) <= 0) {
2814 /* A checkpoint is required if the size of the deleted
2815 * list is not zero. The reason is, I cannot remove the
2816 * logs from the deleted list BEFORE a checkpoint has been
2817 * done which does NOT include these logs.
2818 *
2819 * Even though the logs have already been deleted. They
2820 * remain on the deleted list to ensure that they are NOT
2821 * reused during this time, until the next checkpoint.
2822 *
2823 * This is done because if they are used, then on restart
2824 * they would be deleted!
2825 */
2826#ifdef TRACE_CHECKPOINT
2827 printf("--- END CHECKPOINT - no write\n");
2828#endif
2829 goto checkpoint_done;
2830 }
2831
2832#ifdef TRACE_CHECKPOINT
2833 printf("--- END CHECKPOINT - write start point\n");
2834#endif
2835 xt_lock_mutex_ns(&db->db_datalogs.dlc_lock);
2836
2837 no_of_logs = xt_sl_get_size(db->db_datalogs.dlc_to_delete);
2838 chk_size = offsetof(XTXlogCheckpointDRec, xcp_del_log) + no_of_logs * 2;
2839 xtLogID *log_id_ptr;
2840
2841 if (!(cp_buf = (XTXlogCheckpointDPtr) xt_malloc_ns(chk_size))) {
2842 xt_unlock_mutex_ns(&db->db_datalogs.dlc_lock);
2843 goto failed_0;
2844 }
2845
2846 /* Increment the checkpoint number. This value is used if 2 checkpoint have the
2847 * same log number. In this case checkpoints may differ in the log files
2848 * that should be deleted. Here it is important to use the most recent
2849 * log file!
2850 */
2851 db->db_restart.xres_cp_number++;
2852
2853 /* Create the checkpoint record: */
2854 XT_SET_DISK_4(cp_buf->xcp_head_size_4, chk_size);
2855 XT_SET_DISK_2(cp_buf->xcp_version_2, XT_CHECKPOINT_VERSION);
2856 XT_SET_DISK_6(cp_buf->xcp_chkpnt_no_6, db->db_restart.xres_cp_number);
2857 XT_SET_DISK_4(cp_buf->xcp_log_id_4, cp->cp_log_id);
2858 XT_SET_DISK_6(cp_buf->xcp_log_offs_6, cp->cp_log_offset);
2859 XT_SET_DISK_4(cp_buf->xcp_tab_id_4, db->db_curr_tab_id);
2860 XT_SET_DISK_4(cp_buf->xcp_xact_id_4, db->db_xn_curr_id);
2861 XT_SET_DISK_4(cp_buf->xcp_ind_rec_log_id_4, cp->cp_ind_rec_log_id);
2862 XT_SET_DISK_6(cp_buf->xcp_ind_rec_log_offs_6, cp->cp_ind_rec_log_offset);
2863 XT_SET_DISK_2(cp_buf->xcp_log_count_2, no_of_logs);
2864
2865 for (u_int i=0; i<no_of_logs; i++) {
2866 log_id_ptr = (xtLogID *) xt_sl_item_at(db->db_datalogs.dlc_to_delete, i);
2867 XT_SET_DISK_2(cp_buf->xcp_del_log[i], (xtWord2) *log_id_ptr);
2868 }
2869
2870 XT_SET_DISK_2(cp_buf->xcp_checksum_2, xt_get_checksum(((xtWord1 *) cp_buf) + 2, chk_size - 2, 1));
2871
2872 xt_unlock_mutex_ns(&db->db_datalogs.dlc_lock);
2873
2874 /* Write the checkpoint: */
2875 db->db_restart.xres_name(PATH_MAX, path, db->db_restart.xres_next_res_no);
2876 if (!(of = xt_open_file_ns(path, XT_FS_CREATE | XT_FS_MAKE_PATH)))
2877 goto failed_1;
2878
2879 if (!xt_set_eof_file(NULL, of, 0))
2880 goto failed_2;
2881 if (!xt_pwrite_file(of, 0, chk_size, (xtWord1 *) cp_buf, &thread->st_statistics.st_x, thread))
2882 goto failed_2;
2883 if (!xt_flush_file(of, &thread->st_statistics.st_x, thread))
2884 goto failed_2;
2885
2886 xt_close_file_ns(of);
2887
2888 /* Next time write the other restart file: */
2889 db->db_restart.xres_next_res_no = (db->db_restart.xres_next_res_no % 2) + 1;
2890 db->db_restart.xres_cp_log_id = cp->cp_log_id;
2891 db->db_restart.xres_cp_log_offset = cp->cp_log_offset;
2892 db->db_restart.xres_cp_required = FALSE;
2893
2894 /*
2895 * Remove all the data logs that were deleted on the
2896 * last checkpoint:
2897 */
2898 if (!xres_remove_data_logs(db))
2899 goto failed_0;
2900
2901#ifndef DEBUG_KEEP_LOGS
2902 /* After checkpoint, we can delete transaction logs that will no longer be required
2903 * for recovery...
2904 */
2905 if (cp->cp_log_id > 1) {
2906 xtLogID current_log_id = cp->cp_log_id;
2907 xtLogID del_log_id;
2908
2909#ifdef XT_NUMBER_OF_LOGS_TO_SAVE
2910 if (pbxt_crash_debug) {
2911 /* To save the logs, we just consider them in use: */
2912 if (current_log_id > XT_NUMBER_OF_LOGS_TO_SAVE)
2913 current_log_id -= XT_NUMBER_OF_LOGS_TO_SAVE;
2914 else
2915 current_log_id = 1;
2916 }
2917#endif
2918
2919 del_log_id = current_log_id - 1;
2920
2921 while (del_log_id > 0) {
2922 db->db_xlog.xlog_name(PATH_MAX, path, del_log_id);
2923 if (!xt_fs_exists(path))
2924 break;
2925 del_log_id--;
2926 }
2927
2928 /* This was the lowest log ID that existed: */
2929 del_log_id++;
2930
2931 /* Delete all logs that still exist, that come before
2932 * the current log:
2933 *
2934 * Do this from least to greatest to ensure no "holes" appear.
2935 */
2936 while (del_log_id < current_log_id) {
2937 switch (db->db_xlog.xlog_delete_log(del_log_id, thread)) {
2938 case OK:
2939 break;
2940 case FAILED:
2941 goto exit_loop;
2942 case XT_ERR:
2943 goto failed_0;
2944 }
2945 del_log_id++;
2946 }
2947 exit_loop:;
2948 }
2949
2950 /* And we can delete data logs in the list, and place them
2951 * on the deleted list.
2952 */
2953 xtLogID log_id;
2954 for (u_int i=0; i<no_of_logs; i++) {
2955 log_id = (xtLogID) XT_GET_DISK_2(cp_buf->xcp_del_log[i]);
2956 if (!xres_delete_data_log(db, log_id))
2957 goto failed_0;
2958 }
2959#endif
2960
2961 xt_free_ns(cp_buf);
2962 cp_buf = NULL;
2963
2964 checkpoint_done:
2965 cp->cp_running = FALSE;
2966 if (cp->cp_table_ids) {
2967 xt_free_sortedlist(NULL, cp->cp_table_ids);
2968 cp->cp_table_ids = NULL;
2969 }
2970 cp->cp_flush_count = 0;
2971 cp->cp_next_to_flush = 0;
2972 db->db_restart.xres_cp_required = FALSE;
2973 xt_unlock_mutex_ns(&cp->cp_state_lock);
2974 if (checkpoint_done)
2975 *checkpoint_done = TRUE;
2976 return OK;
2977
2978 failed_2:
2979 xt_close_file_ns(of);
2980
2981 failed_1:
2982 xt_free_ns(cp_buf);
2983
2984 failed_0:
2985 if (cp_buf)
2986 xt_free_ns(cp_buf);
2987 xt_unlock_mutex_ns(&cp->cp_state_lock);
2988 return FAILED;
2989}
2990
2991xtPublic xtWord8 xt_bytes_since_last_checkpoint(XTDatabaseHPtr db, xtLogID curr_log_id, xtLogOffset curr_log_offset)
2992{
2993 xtLogID log_id;
2994 xtLogOffset log_offset;
2995 size_t byte_count = 0;
2996
2997 log_id = db->db_restart.xres_cp_log_id;
2998 log_offset = db->db_restart.xres_cp_log_offset;
2999
3000 /* Assume the logs have the threshold: */
3001 if (log_id < curr_log_id) {
3002 if (log_offset < xt_db_log_file_threshold)
3003 byte_count = (size_t) (xt_db_log_file_threshold - log_offset);
3004 log_offset = 0;
3005 log_id++;
3006 }
3007 while (log_id < curr_log_id) {
3008 byte_count += (size_t) xt_db_log_file_threshold;
3009 log_id++;
3010 }
3011 if (log_offset < curr_log_offset)
3012 byte_count += (size_t) (curr_log_offset - log_offset);
3013
3014 return byte_count;
3015}
3016
3017xtPublic void xt_start_checkpointer(XTThreadPtr self, XTDatabaseHPtr db)
3018{
3019 char name[PATH_MAX];
3020
3021 sprintf(name, "CP-%s", xt_last_directory_of_path(db->db_main_path));
3022 xt_remove_dir_char(name);
3023 db->db_cp_thread = xt_create_daemon(self, name);
3024 xt_set_thread_data(db->db_cp_thread, db, xres_cp_free_thread);
3025 xt_run_thread(self, db->db_cp_thread, xres_cp_run_thread);
3026}
3027
3028xtPublic void xt_wait_for_checkpointer(XTThreadPtr self, XTDatabaseHPtr db)
3029{
3030 time_t then, now;
3031 xtBool message = FALSE;
3032 xtLogID log_id;
3033 xtLogOffset log_offset;
3034
3035 if (db->db_cp_thread) {
3036 then = time(NULL);
3037 for (;;) {
3038 xt_lock_mutex(self, &db->db_wr_lock);
3039 pushr_(xt_unlock_mutex, &db->db_wr_lock);
3040 log_id = db->db_wr_log_id;
3041 log_offset = db->db_wr_log_offset;
3042 freer_(); // xt_unlock_mutex(&db->db_wr_lock)
3043
3044 if (xt_sl_get_size(db->db_datalogs.dlc_to_delete) == 0 &&
3045 xt_sl_get_size(db->db_datalogs.dlc_deleted) == 0 &&
3046 xt_comp_log_pos(log_id, log_offset, db->db_restart.xres_cp_log_id, db->db_restart.xres_cp_log_offset) <= 0)
3047 break;
3048
3049 /* Do a final checkpoint before shutdown: */
3050 db->db_restart.xres_cp_required = TRUE;
3051
3052 xt_lock_mutex(self, &db->db_cp_lock);
3053 pushr_(xt_unlock_mutex, &db->db_cp_lock);
3054 if (!xt_broadcast_cond_ns(&db->db_cp_cond)) {
3055 xt_log_and_clear_exception_ns();
3056 break;
3057 }
3058 freer_(); // xt_unlock_mutex(&db->db_cp_lock)
3059
3060 xt_sleep_milli_second(10);
3061
3062 now = time(NULL);
3063 if (now >= then + 16) {
3064 xt_logf(XT_NT_INFO, "Aborting wait for '%s' checkpointer\n", db->db_name);
3065 message = FALSE;
3066 break;
3067 }
3068 if (now >= then + 2) {
3069 if (!message) {
3070 message = TRUE;
3071 xt_logf(XT_NT_INFO, "Waiting for '%s' checkpointer...\n", db->db_name);
3072 }
3073 }
3074 }
3075
3076 if (message)
3077 xt_logf(XT_NT_INFO, "Checkpointer '%s' done.\n", db->db_name);
3078 }
3079}
3080
3081xtPublic void xt_stop_checkpointer(XTThreadPtr self, XTDatabaseHPtr db)
3082{
3083 XTThreadPtr thr_wr;
3084
3085 if (db->db_cp_thread) {
3086 xt_lock_mutex(self, &db->db_cp_lock);
3087 pushr_(xt_unlock_mutex, &db->db_cp_lock);
3088
3089 /* This pointer is safe as long as you have the transaction lock. */
3090 if ((thr_wr = db->db_cp_thread)) {
3091 xtThreadID tid = thr_wr->t_id;
3092
3093 /* Make sure the thread quits when woken up. */
3094 xt_terminate_thread(self, thr_wr);
3095
3096 xt_wake_checkpointer(self, db);
3097
3098 freer_(); // xt_unlock_mutex(&db->db_cp_lock)
3099
3100 /*
3101 * GOTCHA: This is a wierd thing but the SIGTERM directed
3102 * at a particular thread (in this case the sweeper) was
3103 * being caught by a different thread and killing the server
3104 * sometimes. Disconcerting.
3105 * (this may only be a problem on Mac OS X)
3106 xt_kill_thread(thread);
3107 */
3108 xt_wait_for_thread(tid, FALSE);
3109
3110 /* PMC - This should not be necessary to set the signal here, but in the
3111 * debugger the handler is not called!!?
3112 thr_wr->t_delayed_signal = SIGTERM;
3113 xt_kill_thread(thread);
3114 */
3115 db->db_cp_thread = NULL;
3116 }
3117 else
3118 freer_(); // xt_unlock_mutex(&db->db_cp_lock)
3119 }
3120}
3121
3122xtPublic void xt_wake_checkpointer(XTThreadPtr self, XTDatabaseHPtr db)
3123{
3124 if (!xt_broadcast_cond_ns(&db->db_cp_cond))
3125 xt_log_and_clear_exception(self);
3126}
3127
3128xtPublic void xt_free_writer_state(struct XTThread *self, XTWriterStatePtr ws)
3129{
3130 if (ws->ws_db)
3131 ws->ws_db->db_xlog.xlog_seq_exit(&ws->ws_seqread);
3132 xt_db_set_size(self, &ws->ws_databuf, 0);
3133 xt_ib_free(self, &ws->ws_rec_buf);
3134 if (ws->ws_ot) {
3135 xt_db_return_table_to_pool(self, ws->ws_ot);
3136 ws->ws_ot = NULL;
3137 }
3138}
3139
3140xtPublic void xt_dump_xlogs(XTDatabaseHPtr db, xtLogID start_log)
3141{
3142 XTXactSeqReadRec seq;
3143 XTXactLogBufferDPtr record;
3144 xtLogID log_id = db->db_restart.xres_cp_log_id;
3145 char log_path[PATH_MAX];
3146 XTThreadPtr thread = xt_get_self();
3147
3148 /* Find the first log that still exists:*/
3149 for (;;) {
3150 log_id--;
3151 db->db_xlog.xlog_name(PATH_MAX, log_path, log_id);
3152 if (!xt_fs_exists(log_path))
3153 break;
3154 }
3155 log_id++;
3156
3157 if (!db->db_xlog.xlog_seq_init(&seq, xt_db_log_buffer_size, FALSE))
3158 return;
3159
3160 if (log_id < start_log)
3161 log_id = start_log;
3162
3163 for (;;) {
3164 db->db_xlog.xlog_name(PATH_MAX, log_path, log_id);
3165 if (!xt_fs_exists(log_path))
3166 break;
3167
3168 if (!db->db_xlog.xlog_seq_start(&seq, log_id, 0, FALSE))
3169 goto done;
3170
3171 PRINTF("---------- DUMP LOG %d\n", (int) log_id);
3172 for (;;) {
3173 if (!db->db_xlog.xlog_seq_next(&seq, &record, TRUE, thread)) {
3174 PRINTF("---------- DUMP LOG %d ERROR\n", (int) log_id);
3175 xt_log_and_clear_exception_ns();
3176 break;
3177 }
3178 if (!record) {
3179 PRINTF("---------- DUMP LOG %d DONE\n", (int) log_id);
3180 break;
3181 }
3182 xt_print_log_record(seq.xseq_rec_log_id, seq.xseq_rec_log_offset, record);
3183 }
3184
3185 log_id++;
3186 }
3187
3188 done:
3189 db->db_xlog.xlog_seq_exit(&seq);
3190}
3191
3192/* ----------------------------------------------------------------------
3193 * D A T A B A S E R E C O V E R Y T H R E A D
3194 */
3195
3196extern XTDatabaseHPtr pbxt_database;
3197
3198static void *xn_xres_run_recovery_thread(XTThreadPtr self)
3199{
The diff has been truncated for viewing.

Subscribers

People subscribed via source and target branches