Merge lp:~akopytov/percona-xtrabackup/bug1095249-2.0 into lp:percona-xtrabackup/2.0

Proposed by Alexey Kopytov on 2013-01-03
Status: Merged
Approved by: Laurynas Biveinis on 2013-01-06
Approved revision: no longer in the source branch.
Merged at revision: 494
Proposed branch: lp:~akopytov/percona-xtrabackup/bug1095249-2.0
Merge into: lp:percona-xtrabackup/2.0
Diff against target: 378 lines (+289/-8)
6 files modified
src/Makefile (+2/-1)
src/buffer.c (+197/-0)
src/buffer.h (+31/-0)
src/compress.c (+22/-4)
src/datasink.h (+5/-3)
test/t/ib_compress_basic.sh (+32/-0)
To merge this branch: bzr merge lp:~akopytov/percona-xtrabackup/bug1095249-2.0
Reviewer Review Type Date Requested Status
Laurynas Biveinis (community) Approve on 2013-01-06
George Ormond Lorch III g2 2013-01-03 Approve on 2013-01-03
Review via email: mp+141769@code.launchpad.net
To post a comment you must log in.
George Ormond Lorch III (gl-az) wrote :

I see now why you re-wrote the ds chaining in 2.1. I don't like that the compression sink needs to manage it's connection to the next sink (the buffer in this case) rather than having it done at the higher level in xtrabackup.c like it is now done in 2.1.

Anyway, looks like a reasonable fix to the issue.

review: Approve (g2)
Alexey Kopytov (akopytov) wrote :

http://jenkins.percona.com/view/XtraBackup/job/percona-xtrabackup-2.0-param/319/ (though compression tests are not currently executed in Jenkins anyway).

The commit message includes the title of a wrong bug.
OK with it fixed.

review: Approve
491. By Alexey Kopytov on 2013-01-07

Bug #1095249: Built-in compression does inefficient I/O

The problem was the xtrabackup --compress did unbuffered writes to the
destination file or stream in very small chunks.

Fixed by:

- backporting datasink_buffer from the compact backups branch
- inserting it between datasink_compress and the destination datasink
- using a 1M buffer for output similar to the uncompressed backup

Alexey Kopytov (akopytov) wrote :

Hi Laurynas,

On Sun, 06 Jan 2013 11:38:21 -0000, Laurynas Biveinis wrote:
> Review: Approve
>
> The commit message includes the title of a wrong bug.
> OK with it fixed.
>

Fixed.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'src/Makefile'
--- src/Makefile 2012-02-10 20:05:56 +0000
+++ src/Makefile 2013-01-07 06:22:20 +0000
@@ -23,7 +23,8 @@
23BIN_DIR=$(PREFIX)/bin23BIN_DIR=$(PREFIX)/bin
2424
25COMMON_INC = -I. -I libarchive/libarchive -I quicklz25COMMON_INC = -I. -I libarchive/libarchive -I quicklz
26XTRABACKUPOBJS = xtrabackup.o stream.o local.o compress.o xbstream_write.o \26XTRABACKUPOBJS = xtrabackup.o stream.o local.o compress.o buffer.o \
27 xbstream_write.o \
27 quicklz/quicklz.o28 quicklz/quicklz.o
28XBSTREAMOBJS = xbstream.o xbstream_write.o xbstream_read.o29XBSTREAMOBJS = xbstream.o xbstream_write.o xbstream_read.o
2930
3031
=== added file 'src/buffer.c'
--- src/buffer.c 1970-01-01 00:00:00 +0000
+++ src/buffer.c 2013-01-07 06:22:20 +0000
@@ -0,0 +1,197 @@
1/******************************************************
2Copyright (c) 2012 Percona Ireland Ltd.
3
4buffer datasink for XtraBackup.
5
6This program is free software; you can redistribute it and/or modify
7it under the terms of the GNU General Public License as published by
8the Free Software Foundation; version 2 of the License.
9
10This program is distributed in the hope that it will be useful,
11but WITHOUT ANY WARRANTY; without even the implied warranty of
12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13GNU General Public License for more details.
14
15You should have received a copy of the GNU General Public License
16along with this program; if not, write to the Free Software
17Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
19*******************************************************/
20
21/* Does buffered output to a destination datasink set with ds_set_pipe().
22Writes to the destination datasink are guaranteed to not be smaller than a
23specified buffer size (DS_DEFAULT_BUFFER_SIZE by default), with the only
24exception for the last write for a file. */
25
26#include <my_base.h>
27#include "common.h"
28#include "datasink.h"
29
30#define DS_DEFAULT_BUFFER_SIZE (64 * 1024)
31
32typedef struct {
33 ds_file_t *dst_file;
34 char *buf;
35 size_t pos;
36 size_t size;
37} ds_buffer_file_t;
38
39typedef struct {
40 size_t buffer_size;
41} ds_buffer_ctxt_t;
42
43static ds_ctxt_t *buffer_init(const char *root);
44static ds_file_t *buffer_open(ds_ctxt_t *ctxt, const char *path,
45 MY_STAT *mystat);
46static int buffer_write(ds_file_t *file, const void *buf, size_t len);
47static int buffer_close(ds_file_t *file);
48static void buffer_deinit(ds_ctxt_t *ctxt);
49
50datasink_t datasink_buffer = {
51 &buffer_init,
52 &buffer_open,
53 &buffer_write,
54 &buffer_close,
55 &buffer_deinit
56};
57
58/* Change the default buffer size */
59void ds_buffer_set_size(ds_ctxt_t *ctxt, size_t size)
60{
61 ds_buffer_ctxt_t *buffer_ctxt = (ds_buffer_ctxt_t *) ctxt->ptr;
62
63 buffer_ctxt->buffer_size = size;
64}
65
66static ds_ctxt_t *
67buffer_init(const char *root)
68{
69 ds_ctxt_t *ctxt;
70 ds_buffer_ctxt_t *buffer_ctxt;
71
72 ctxt = my_malloc(sizeof(ds_ctxt_t) + sizeof(ds_buffer_ctxt_t),
73 MYF(MY_FAE));
74 buffer_ctxt = (ds_buffer_ctxt_t *) (ctxt + 1);
75 buffer_ctxt->buffer_size = DS_DEFAULT_BUFFER_SIZE;
76
77 ctxt->ptr = buffer_ctxt;
78 ctxt->root = my_strdup(root, MYF(MY_FAE));
79 ctxt->datasink = &datasink_buffer;
80
81 return ctxt;
82}
83
84static ds_file_t *
85buffer_open(ds_ctxt_t *ctxt, const char *path, MY_STAT *mystat)
86{
87 ds_buffer_ctxt_t *buffer_ctxt;
88 ds_ctxt_t *pipe_ctxt;
89 ds_file_t *dst_file;
90 ds_file_t *file;
91 ds_buffer_file_t *buffer_file;
92
93 pipe_ctxt = ctxt->pipe_ctxt;
94 xb_a(pipe_ctxt != NULL);
95
96 dst_file = pipe_ctxt->datasink->open(pipe_ctxt, path, mystat);
97 if (dst_file == NULL) {
98 exit(EXIT_FAILURE);
99 }
100
101 dst_file->datasink = pipe_ctxt->datasink;
102
103 buffer_ctxt = (ds_buffer_ctxt_t *) ctxt->ptr;
104
105 file = (ds_file_t *) my_malloc(sizeof(ds_file_t) +
106 sizeof(ds_buffer_file_t) +
107 buffer_ctxt->buffer_size,
108 MYF(MY_FAE));
109
110 buffer_file = (ds_buffer_file_t *) (file + 1);
111 buffer_file->dst_file = dst_file;
112 buffer_file->buf = (char *) (buffer_file + 1);
113 buffer_file->size = buffer_ctxt->buffer_size;
114 buffer_file->pos = 0;
115
116 file->path = dst_file->path;
117 file->ptr = buffer_file;
118
119 return file;
120}
121
122static int
123buffer_write(ds_file_t *file, const void *buf, size_t len)
124{
125 ds_buffer_file_t *buffer_file;
126 datasink_t *dest_ds;
127
128 buffer_file = (ds_buffer_file_t *) file->ptr;
129
130 dest_ds = buffer_file->dst_file->datasink;
131
132 while (len > 0) {
133 if (buffer_file->pos + len > buffer_file->size) {
134 if (buffer_file->pos > 0) {
135 size_t bytes;
136
137 bytes = buffer_file->size - buffer_file->pos;
138 memcpy(buffer_file->buf + buffer_file->pos, buf,
139 bytes);
140
141 if (dest_ds->write(buffer_file->dst_file,
142 buffer_file->buf,
143 buffer_file->size)) {
144 return 1;
145 }
146
147 buffer_file->pos = 0;
148
149 buf = (const char *) buf + bytes;
150 len -= bytes;
151 } else {
152 /* We don't have any buffered bytes, just write
153 the entire source buffer */
154 if (dest_ds->write(buffer_file->dst_file, buf,
155 len)) {
156 return 1;
157 }
158 break;
159 }
160 } else {
161 memcpy(buffer_file->buf + buffer_file->pos, buf, len);
162 buffer_file->pos += len;
163 break;
164 }
165 }
166
167 return 0;
168}
169
170static int
171buffer_close(ds_file_t *file)
172{
173 ds_buffer_file_t *buffer_file;
174 datasink_t *dest_ds;
175
176 buffer_file = (ds_buffer_file_t *) file->ptr;
177
178 dest_ds = buffer_file->dst_file->datasink;
179
180 if (buffer_file->pos > 0) {
181 dest_ds->write(buffer_file->dst_file, buffer_file->buf,
182 buffer_file->pos);
183 }
184
185 dest_ds->close(buffer_file->dst_file);
186
187 MY_FREE(file);
188
189 return 0;
190}
191
192static void
193buffer_deinit(ds_ctxt_t *ctxt)
194{
195 MY_FREE(ctxt->root);
196 MY_FREE(ctxt);
197}
0198
=== added file 'src/buffer.h'
--- src/buffer.h 1970-01-01 00:00:00 +0000
+++ src/buffer.h 2013-01-07 06:22:20 +0000
@@ -0,0 +1,31 @@
1/******************************************************
2Copyright (c) 2012 Percona Ireland Ltd.
3
4buffer datasink for XtraBackup.
5
6This program is free software; you can redistribute it and/or modify
7it under the terms of the GNU General Public License as published by
8the Free Software Foundation; version 2 of the License.
9
10This program is distributed in the hope that it will be useful,
11but WITHOUT ANY WARRANTY; without even the implied warranty of
12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13GNU General Public License for more details.
14
15You should have received a copy of the GNU General Public License
16along with this program; if not, write to the Free Software
17Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
19*******************************************************/
20
21#ifndef DS_BUFFER_H
22#define DS_BUFFER_H
23
24#include "datasink.h"
25
26extern datasink_t datasink_buffer;
27
28/* Change the default buffer size */
29void ds_buffer_set_size(ds_ctxt_t *ctxt, size_t size);
30
31#endif
032
=== modified file 'src/compress.c'
--- src/compress.c 2012-06-14 10:50:28 +0000
+++ src/compress.c 2013-01-07 06:22:20 +0000
@@ -27,6 +27,7 @@
27#include "datasink.h"27#include "datasink.h"
28#include "stream.h"28#include "stream.h"
29#include "local.h"29#include "local.h"
30#include "buffer.h"
3031
31#define COMPRESS_CHUNK_SIZE (64 * 1024UL)32#define COMPRESS_CHUNK_SIZE (64 * 1024UL)
32#define MY_QLZ_COMPRESS_OVERHEAD 40033#define MY_QLZ_COMPRESS_OVERHEAD 400
@@ -97,15 +98,26 @@
97 ds_ctxt_t *ctxt;98 ds_ctxt_t *ctxt;
98 ds_compress_ctxt_t *compress_ctxt;99 ds_compress_ctxt_t *compress_ctxt;
99 datasink_t *dest_ds;100 datasink_t *dest_ds;
101 datasink_t *pipe_ds;
100 ds_ctxt_t *dest_ctxt;102 ds_ctxt_t *dest_ctxt;
101 comp_thread_ctxt_t *threads;103 comp_thread_ctxt_t *threads;
102104
105 dest_ds = &datasink_buffer;
106
107 dest_ctxt = dest_ds->init(root);
108 if (dest_ctxt == NULL) {
109 msg("compress: failed to initialize the buffer datasink.\n");
110 return NULL;
111 }
112
113 /* Use a 1 MB buffer for compressed output stream */
114 ds_buffer_set_size(dest_ctxt, 1024 * 1024);
115
103 /* Decide whether the compressed data will be stored in local files or116 /* Decide whether the compressed data will be stored in local files or
104 streamed to an archive */117 streamed to an archive */
105 dest_ds = xtrabackup_stream ? &datasink_stream : &datasink_local;118 pipe_ds = xtrabackup_stream ? &datasink_stream : &datasink_local;
106119 dest_ctxt->pipe_ctxt = pipe_ds->init(root);
107 dest_ctxt = dest_ds->init(root);120 if (dest_ctxt->pipe_ctxt == NULL) {
108 if (dest_ctxt == NULL) {
109 msg("compress: failed to initialize the target datasink.\n");121 msg("compress: failed to initialize the target datasink.\n");
110 return NULL;122 return NULL;
111 }123 }
@@ -323,7 +335,9 @@
323{335{
324 ds_compress_ctxt_t *comp_ctxt;336 ds_compress_ctxt_t *comp_ctxt;
325 ds_ctxt_t *dest_ctxt;337 ds_ctxt_t *dest_ctxt;
338 ds_ctxt_t *pipe_ctxt;
326 datasink_t *dest_ds;339 datasink_t *dest_ds;
340 datasink_t *pipe_ds;
327341
328 comp_ctxt = (ds_compress_ctxt_t *) ctxt->ptr;;342 comp_ctxt = (ds_compress_ctxt_t *) ctxt->ptr;;
329343
@@ -332,7 +346,11 @@
332 dest_ctxt = comp_ctxt->dest_ctxt;346 dest_ctxt = comp_ctxt->dest_ctxt;
333 dest_ds = dest_ctxt->datasink;347 dest_ds = dest_ctxt->datasink;
334348
349 pipe_ctxt = dest_ctxt->pipe_ctxt;
350 pipe_ds = pipe_ctxt->datasink;
351
335 dest_ds->deinit(dest_ctxt);352 dest_ds->deinit(dest_ctxt);
353 pipe_ds->deinit(pipe_ctxt);
336354
337 MY_FREE(ctxt);355 MY_FREE(ctxt);
338}356}
339357
=== modified file 'src/datasink.h'
--- src/datasink.h 2012-02-10 20:05:56 +0000
+++ src/datasink.h 2013-01-07 06:22:20 +0000
@@ -26,15 +26,17 @@
2626
27struct datasink_struct;27struct datasink_struct;
2828
29typedef struct {29typedef struct ds_ctxt {
30 struct datasink_struct *datasink;30 struct datasink_struct *datasink;
31 char *root;31 char *root;
32 void *ptr;32 void *ptr;
33 struct ds_ctxt *pipe_ctxt;
33} ds_ctxt_t;34} ds_ctxt_t;
3435
35typedef struct {36typedef struct {
36 void *ptr;37 void *ptr;
37 char *path;38 char *path;
39 struct datasink_struct *datasink;
38} ds_file_t;40} ds_file_t;
3941
40typedef struct datasink_struct {42typedef struct datasink_struct {
4143
=== added file 'test/t/ib_compress_basic.sh'
--- test/t/ib_compress_basic.sh 1970-01-01 00:00:00 +0000
+++ test/t/ib_compress_basic.sh 2013-01-07 06:22:20 +0000
@@ -0,0 +1,32 @@
1########################################################################
2# Basic test for local compressed backups
3########################################################################
4
5. inc/common.sh
6
7if ! which qpress > /dev/null 2>&1 ; then
8 echo "Requires qpress to be installed" > $SKIPPED_REASON
9 exit $SKIPPED_EXIT_CODE
10fi
11
12start_server --innodb_file_per_table
13load_sakila
14
15innobackupex --compress --no-timestamp $topdir/backup
16
17stop_server
18rm -rf ${MYSQLD_DATADIR}/*
19
20cd $topdir/backup
21
22for i in *.qp; do qpress -d $i ./; done; \
23for i in sakila/*.qp; do qpress -d $i sakila/; done
24
25innobackupex --apply-log $topdir/backup
26
27innobackupex --copy-back $topdir/backup
28
29start_server
30
31# TODO: proper validation
32run_cmd ${MYSQL} ${MYSQL_ARGS} -e "SELECT count(*) from actor" sakila

Subscribers

People subscribed via source and target branches