Merge lp:~akopytov/percona-xtrabackup/bug1095249-2.0 into lp:percona-xtrabackup/2.0

Proposed by Alexey Kopytov
Status: Merged
Approved by: Laurynas Biveinis
Approved revision: no longer in the source branch.
Merged at revision: 494
Proposed branch: lp:~akopytov/percona-xtrabackup/bug1095249-2.0
Merge into: lp:percona-xtrabackup/2.0
Diff against target: 378 lines (+289/-8)
6 files modified
src/Makefile (+2/-1)
src/buffer.c (+197/-0)
src/buffer.h (+31/-0)
src/compress.c (+22/-4)
src/datasink.h (+5/-3)
test/t/ib_compress_basic.sh (+32/-0)
To merge this branch: bzr merge lp:~akopytov/percona-xtrabackup/bug1095249-2.0
Reviewer Review Type Date Requested Status
Laurynas Biveinis (community) Approve
George Ormond Lorch III (community) g2 Approve
Review via email: mp+141769@code.launchpad.net
To post a comment you must log in.
Revision history for this message
George Ormond Lorch III (gl-az) wrote :

I see now why you re-wrote the ds chaining in 2.1. I don't like that the compression sink needs to manage it's connection to the next sink (the buffer in this case) rather than having it done at the higher level in xtrabackup.c like it is now done in 2.1.

Anyway, looks like a reasonable fix to the issue.

review: Approve (g2)
Revision history for this message
Alexey Kopytov (akopytov) wrote :

http://jenkins.percona.com/view/XtraBackup/job/percona-xtrabackup-2.0-param/319/ (though compression tests are not currently executed in Jenkins anyway).

Revision history for this message
Laurynas Biveinis (laurynas-biveinis) wrote :

The commit message includes the title of a wrong bug.
OK with it fixed.

review: Approve
Revision history for this message
Alexey Kopytov (akopytov) wrote :

Hi Laurynas,

On Sun, 06 Jan 2013 11:38:21 -0000, Laurynas Biveinis wrote:
> Review: Approve
>
> The commit message includes the title of a wrong bug.
> OK with it fixed.
>

Fixed.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/Makefile'
2--- src/Makefile 2012-02-10 20:05:56 +0000
3+++ src/Makefile 2013-01-07 06:22:20 +0000
4@@ -23,7 +23,8 @@
5 BIN_DIR=$(PREFIX)/bin
6
7 COMMON_INC = -I. -I libarchive/libarchive -I quicklz
8-XTRABACKUPOBJS = xtrabackup.o stream.o local.o compress.o xbstream_write.o \
9+XTRABACKUPOBJS = xtrabackup.o stream.o local.o compress.o buffer.o \
10+ xbstream_write.o \
11 quicklz/quicklz.o
12 XBSTREAMOBJS = xbstream.o xbstream_write.o xbstream_read.o
13
14
15=== added file 'src/buffer.c'
16--- src/buffer.c 1970-01-01 00:00:00 +0000
17+++ src/buffer.c 2013-01-07 06:22:20 +0000
18@@ -0,0 +1,197 @@
19+/******************************************************
20+Copyright (c) 2012 Percona Ireland Ltd.
21+
22+buffer datasink for XtraBackup.
23+
24+This program is free software; you can redistribute it and/or modify
25+it under the terms of the GNU General Public License as published by
26+the Free Software Foundation; version 2 of the License.
27+
28+This program is distributed in the hope that it will be useful,
29+but WITHOUT ANY WARRANTY; without even the implied warranty of
30+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31+GNU General Public License for more details.
32+
33+You should have received a copy of the GNU General Public License
34+along with this program; if not, write to the Free Software
35+Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
36+
37+*******************************************************/
38+
39+/* Does buffered output to a destination datasink set with ds_set_pipe().
40+Writes to the destination datasink are guaranteed to not be smaller than a
41+specified buffer size (DS_DEFAULT_BUFFER_SIZE by default), with the only
42+exception for the last write for a file. */
43+
44+#include <my_base.h>
45+#include "common.h"
46+#include "datasink.h"
47+
48+#define DS_DEFAULT_BUFFER_SIZE (64 * 1024)
49+
50+typedef struct {
51+ ds_file_t *dst_file;
52+ char *buf;
53+ size_t pos;
54+ size_t size;
55+} ds_buffer_file_t;
56+
57+typedef struct {
58+ size_t buffer_size;
59+} ds_buffer_ctxt_t;
60+
61+static ds_ctxt_t *buffer_init(const char *root);
62+static ds_file_t *buffer_open(ds_ctxt_t *ctxt, const char *path,
63+ MY_STAT *mystat);
64+static int buffer_write(ds_file_t *file, const void *buf, size_t len);
65+static int buffer_close(ds_file_t *file);
66+static void buffer_deinit(ds_ctxt_t *ctxt);
67+
68+datasink_t datasink_buffer = {
69+ &buffer_init,
70+ &buffer_open,
71+ &buffer_write,
72+ &buffer_close,
73+ &buffer_deinit
74+};
75+
76+/* Change the default buffer size */
77+void ds_buffer_set_size(ds_ctxt_t *ctxt, size_t size)
78+{
79+ ds_buffer_ctxt_t *buffer_ctxt = (ds_buffer_ctxt_t *) ctxt->ptr;
80+
81+ buffer_ctxt->buffer_size = size;
82+}
83+
84+static ds_ctxt_t *
85+buffer_init(const char *root)
86+{
87+ ds_ctxt_t *ctxt;
88+ ds_buffer_ctxt_t *buffer_ctxt;
89+
90+ ctxt = my_malloc(sizeof(ds_ctxt_t) + sizeof(ds_buffer_ctxt_t),
91+ MYF(MY_FAE));
92+ buffer_ctxt = (ds_buffer_ctxt_t *) (ctxt + 1);
93+ buffer_ctxt->buffer_size = DS_DEFAULT_BUFFER_SIZE;
94+
95+ ctxt->ptr = buffer_ctxt;
96+ ctxt->root = my_strdup(root, MYF(MY_FAE));
97+ ctxt->datasink = &datasink_buffer;
98+
99+ return ctxt;
100+}
101+
102+static ds_file_t *
103+buffer_open(ds_ctxt_t *ctxt, const char *path, MY_STAT *mystat)
104+{
105+ ds_buffer_ctxt_t *buffer_ctxt;
106+ ds_ctxt_t *pipe_ctxt;
107+ ds_file_t *dst_file;
108+ ds_file_t *file;
109+ ds_buffer_file_t *buffer_file;
110+
111+ pipe_ctxt = ctxt->pipe_ctxt;
112+ xb_a(pipe_ctxt != NULL);
113+
114+ dst_file = pipe_ctxt->datasink->open(pipe_ctxt, path, mystat);
115+ if (dst_file == NULL) {
116+ exit(EXIT_FAILURE);
117+ }
118+
119+ dst_file->datasink = pipe_ctxt->datasink;
120+
121+ buffer_ctxt = (ds_buffer_ctxt_t *) ctxt->ptr;
122+
123+ file = (ds_file_t *) my_malloc(sizeof(ds_file_t) +
124+ sizeof(ds_buffer_file_t) +
125+ buffer_ctxt->buffer_size,
126+ MYF(MY_FAE));
127+
128+ buffer_file = (ds_buffer_file_t *) (file + 1);
129+ buffer_file->dst_file = dst_file;
130+ buffer_file->buf = (char *) (buffer_file + 1);
131+ buffer_file->size = buffer_ctxt->buffer_size;
132+ buffer_file->pos = 0;
133+
134+ file->path = dst_file->path;
135+ file->ptr = buffer_file;
136+
137+ return file;
138+}
139+
140+static int
141+buffer_write(ds_file_t *file, const void *buf, size_t len)
142+{
143+ ds_buffer_file_t *buffer_file;
144+ datasink_t *dest_ds;
145+
146+ buffer_file = (ds_buffer_file_t *) file->ptr;
147+
148+ dest_ds = buffer_file->dst_file->datasink;
149+
150+ while (len > 0) {
151+ if (buffer_file->pos + len > buffer_file->size) {
152+ if (buffer_file->pos > 0) {
153+ size_t bytes;
154+
155+ bytes = buffer_file->size - buffer_file->pos;
156+ memcpy(buffer_file->buf + buffer_file->pos, buf,
157+ bytes);
158+
159+ if (dest_ds->write(buffer_file->dst_file,
160+ buffer_file->buf,
161+ buffer_file->size)) {
162+ return 1;
163+ }
164+
165+ buffer_file->pos = 0;
166+
167+ buf = (const char *) buf + bytes;
168+ len -= bytes;
169+ } else {
170+ /* We don't have any buffered bytes, just write
171+ the entire source buffer */
172+ if (dest_ds->write(buffer_file->dst_file, buf,
173+ len)) {
174+ return 1;
175+ }
176+ break;
177+ }
178+ } else {
179+ memcpy(buffer_file->buf + buffer_file->pos, buf, len);
180+ buffer_file->pos += len;
181+ break;
182+ }
183+ }
184+
185+ return 0;
186+}
187+
188+static int
189+buffer_close(ds_file_t *file)
190+{
191+ ds_buffer_file_t *buffer_file;
192+ datasink_t *dest_ds;
193+
194+ buffer_file = (ds_buffer_file_t *) file->ptr;
195+
196+ dest_ds = buffer_file->dst_file->datasink;
197+
198+ if (buffer_file->pos > 0) {
199+ dest_ds->write(buffer_file->dst_file, buffer_file->buf,
200+ buffer_file->pos);
201+ }
202+
203+ dest_ds->close(buffer_file->dst_file);
204+
205+ MY_FREE(file);
206+
207+ return 0;
208+}
209+
210+static void
211+buffer_deinit(ds_ctxt_t *ctxt)
212+{
213+ MY_FREE(ctxt->root);
214+ MY_FREE(ctxt);
215+}
216
217=== added file 'src/buffer.h'
218--- src/buffer.h 1970-01-01 00:00:00 +0000
219+++ src/buffer.h 2013-01-07 06:22:20 +0000
220@@ -0,0 +1,31 @@
221+/******************************************************
222+Copyright (c) 2012 Percona Ireland Ltd.
223+
224+buffer datasink for XtraBackup.
225+
226+This program is free software; you can redistribute it and/or modify
227+it under the terms of the GNU General Public License as published by
228+the Free Software Foundation; version 2 of the License.
229+
230+This program is distributed in the hope that it will be useful,
231+but WITHOUT ANY WARRANTY; without even the implied warranty of
232+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
233+GNU General Public License for more details.
234+
235+You should have received a copy of the GNU General Public License
236+along with this program; if not, write to the Free Software
237+Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
238+
239+*******************************************************/
240+
241+#ifndef DS_BUFFER_H
242+#define DS_BUFFER_H
243+
244+#include "datasink.h"
245+
246+extern datasink_t datasink_buffer;
247+
248+/* Change the default buffer size */
249+void ds_buffer_set_size(ds_ctxt_t *ctxt, size_t size);
250+
251+#endif
252
253=== modified file 'src/compress.c'
254--- src/compress.c 2012-06-14 10:50:28 +0000
255+++ src/compress.c 2013-01-07 06:22:20 +0000
256@@ -27,6 +27,7 @@
257 #include "datasink.h"
258 #include "stream.h"
259 #include "local.h"
260+#include "buffer.h"
261
262 #define COMPRESS_CHUNK_SIZE (64 * 1024UL)
263 #define MY_QLZ_COMPRESS_OVERHEAD 400
264@@ -97,15 +98,26 @@
265 ds_ctxt_t *ctxt;
266 ds_compress_ctxt_t *compress_ctxt;
267 datasink_t *dest_ds;
268+ datasink_t *pipe_ds;
269 ds_ctxt_t *dest_ctxt;
270 comp_thread_ctxt_t *threads;
271
272+ dest_ds = &datasink_buffer;
273+
274+ dest_ctxt = dest_ds->init(root);
275+ if (dest_ctxt == NULL) {
276+ msg("compress: failed to initialize the buffer datasink.\n");
277+ return NULL;
278+ }
279+
280+ /* Use a 1 MB buffer for compressed output stream */
281+ ds_buffer_set_size(dest_ctxt, 1024 * 1024);
282+
283 /* Decide whether the compressed data will be stored in local files or
284 streamed to an archive */
285- dest_ds = xtrabackup_stream ? &datasink_stream : &datasink_local;
286-
287- dest_ctxt = dest_ds->init(root);
288- if (dest_ctxt == NULL) {
289+ pipe_ds = xtrabackup_stream ? &datasink_stream : &datasink_local;
290+ dest_ctxt->pipe_ctxt = pipe_ds->init(root);
291+ if (dest_ctxt->pipe_ctxt == NULL) {
292 msg("compress: failed to initialize the target datasink.\n");
293 return NULL;
294 }
295@@ -323,7 +335,9 @@
296 {
297 ds_compress_ctxt_t *comp_ctxt;
298 ds_ctxt_t *dest_ctxt;
299+ ds_ctxt_t *pipe_ctxt;
300 datasink_t *dest_ds;
301+ datasink_t *pipe_ds;
302
303 comp_ctxt = (ds_compress_ctxt_t *) ctxt->ptr;;
304
305@@ -332,7 +346,11 @@
306 dest_ctxt = comp_ctxt->dest_ctxt;
307 dest_ds = dest_ctxt->datasink;
308
309+ pipe_ctxt = dest_ctxt->pipe_ctxt;
310+ pipe_ds = pipe_ctxt->datasink;
311+
312 dest_ds->deinit(dest_ctxt);
313+ pipe_ds->deinit(pipe_ctxt);
314
315 MY_FREE(ctxt);
316 }
317
318=== modified file 'src/datasink.h'
319--- src/datasink.h 2012-02-10 20:05:56 +0000
320+++ src/datasink.h 2013-01-07 06:22:20 +0000
321@@ -26,15 +26,17 @@
322
323 struct datasink_struct;
324
325-typedef struct {
326+typedef struct ds_ctxt {
327 struct datasink_struct *datasink;
328 char *root;
329 void *ptr;
330+ struct ds_ctxt *pipe_ctxt;
331 } ds_ctxt_t;
332
333 typedef struct {
334- void *ptr;
335- char *path;
336+ void *ptr;
337+ char *path;
338+ struct datasink_struct *datasink;
339 } ds_file_t;
340
341 typedef struct datasink_struct {
342
343=== added file 'test/t/ib_compress_basic.sh'
344--- test/t/ib_compress_basic.sh 1970-01-01 00:00:00 +0000
345+++ test/t/ib_compress_basic.sh 2013-01-07 06:22:20 +0000
346@@ -0,0 +1,32 @@
347+########################################################################
348+# Basic test for local compressed backups
349+########################################################################
350+
351+. inc/common.sh
352+
353+if ! which qpress > /dev/null 2>&1 ; then
354+ echo "Requires qpress to be installed" > $SKIPPED_REASON
355+ exit $SKIPPED_EXIT_CODE
356+fi
357+
358+start_server --innodb_file_per_table
359+load_sakila
360+
361+innobackupex --compress --no-timestamp $topdir/backup
362+
363+stop_server
364+rm -rf ${MYSQLD_DATADIR}/*
365+
366+cd $topdir/backup
367+
368+for i in *.qp; do qpress -d $i ./; done; \
369+for i in sakila/*.qp; do qpress -d $i sakila/; done
370+
371+innobackupex --apply-log $topdir/backup
372+
373+innobackupex --copy-back $topdir/backup
374+
375+start_server
376+
377+# TODO: proper validation
378+run_cmd ${MYSQL} ${MYSQL_ARGS} -e "SELECT count(*) from actor" sakila

Subscribers

People subscribed via source and target branches