Merge lp:~sergei.glushchenko/percona-xtrabackup/20-decompress into lp:percona-xtrabackup/2.0

Proposed by Sergei Glushchenko
Status: Rejected
Rejected by: Alexey Kopytov
Proposed branch: lp:~sergei.glushchenko/percona-xtrabackup/20-decompress
Merge into: lp:percona-xtrabackup/2.0
Diff against target: 634 lines (+518/-5)
6 files modified
src/Makefile (+2/-2)
src/datasink.h (+1/-0)
src/decompress.c (+453/-0)
src/decompress.h (+28/-0)
src/xbstream.c (+25/-3)
test/t/ib_stream_decompress.sh (+9/-0)
To merge this branch: bzr merge lp:~sergei.glushchenko/percona-xtrabackup/20-decompress
Reviewer Review Type Date Requested Status
Alexey Kopytov (community) Disapprove
Review via email: mp+120287@code.launchpad.net

Description of the change

BP https://blueprints.launchpad.net/percona-xtrabackup/+spec/xbstream-decompress
  Support for decompression in xbstream. New option -z been added to
  automatically decompress .qp files inside the stream. decompress datasink
  been implemented for qpress10 single file archives extraction.

#24660

To post a comment you must log in.
Revision history for this message
Sergei Glushchenko (sergei.glushchenko) wrote :
Revision history for this message
Stewart Smith (stewart) wrote :

I have three thoughts:
- Should we add this to 2.0 at all? It is adding a feature and a 2.1 release probably isn't far off.
- I think that certainly in 2.1, this should be default behaviour, as it's almost certainly what the user wanted.
- parallel decompression?

Revision history for this message
Sergei Glushchenko (sergei.glushchenko) wrote :

> I have three thoughts:
> - Should we add this to 2.0 at all? It is adding a feature and a 2.1 release
> probably isn't far off.

We have customer using 2.0, who want this feature.

> - I think that certainly in 2.1, this should be default behaviour, as it's
> almost certainly what the user wanted.

Yes, probably you're right.

> - parallel decompression?

I've experimented with parallel decompression, implementing it similar to parallel compression and it didn't gave me any performance gain. It was even slower than thread-per-file implementation. Do you have any specific proposals how to implement parallel decompression?

Revision history for this message
Stewart Smith (stewart) wrote :

> > - parallel decompression?
>
> I've experimented with parallel decompression, implementing it similar to
> parallel compression and it didn't gave me any performance gain. It was even
> slower than thread-per-file implementation. Do you have any specific proposals
> how to implement parallel decompression?

Thread per file is fine.

Revision history for this message
Alexey Kopytov (akopytov) wrote :

As discussed on IRC, the feature will be implemented by adding a --decompress-program option to xbstream so that an external utility such as qpress can be used.

Revision history for this message
Alexey Kopytov (akopytov) wrote :

After further analysis and discussions it was decided that this feature does not really have real applications, while complicating the xbstream architecture significantly.

Users willing to utilize compression only to speed up transferring the entire archive to another host do not really need built-in compression in XtraBackup. That feature was implemented for different use cases. Instead, users can use normal uncompressed stream in XtraBackup (either tar, or xbstream in case of parallel or incremental backups) and then use external compression utilities (either single- or multi-threaded) to compress the stream and decompress it on remote host.

review: Disapprove

Unmerged revisions

462. By Sergei Glushchenko

BP https://blueprints.launchpad.net/percona-xtrabackup/+spec/xbstream-decompress
Support for decompression in xbstream. New option -z been added to
automatically decompress .qp files inside the stream. decompress datasink
been implemented for qpress10 one-file archives extraction.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'src/Makefile'
--- src/Makefile 2012-02-10 20:05:56 +0000
+++ src/Makefile 2012-08-19 12:20:47 +0000
@@ -24,7 +24,7 @@
2424
25COMMON_INC = -I. -I libarchive/libarchive -I quicklz25COMMON_INC = -I. -I libarchive/libarchive -I quicklz
26XTRABACKUPOBJS = xtrabackup.o stream.o local.o compress.o xbstream_write.o \26XTRABACKUPOBJS = xtrabackup.o stream.o local.o compress.o xbstream_write.o \
27 quicklz/quicklz.o27 quicklz/quicklz.o decompress.o
28XBSTREAMOBJS = xbstream.o xbstream_write.o xbstream_read.o28XBSTREAMOBJS = xbstream.o xbstream_write.o xbstream_read.o
2929
30LIBARCHIVE_A = libarchive/libarchive/libarchive.a30LIBARCHIVE_A = libarchive/libarchive/libarchive.a
@@ -176,7 +176,7 @@
176xbstream.o xbstream_read.o: %.o: %.c176xbstream.o xbstream_read.o: %.o: %.c
177 $(CC) $(CFLAGS) $(INC) $(DEFS) -c $< -o $@177 $(CC) $(CFLAGS) $(INC) $(DEFS) -c $< -o $@
178178
179xbstream: $(XBSTREAMOBJS) $(MYSQLOBJS) local.o179xbstream: $(XBSTREAMOBJS) $(MYSQLOBJS) local.o decompress.o quicklz/quicklz.o
180 $(CC) $(CFLAGS) $^ $(INC) $(MYSQLOBJS) $(LIBS) -o $@180 $(CC) $(CFLAGS) $^ $(INC) $(MYSQLOBJS) $(LIBS) -o $@
181181
182xtrabackup.o: xtrabackup.c xb_regex.h182xtrabackup.o: xtrabackup.c xb_regex.h
183183
=== modified file 'src/datasink.h'
--- src/datasink.h 2012-02-10 20:05:56 +0000
+++ src/datasink.h 2012-08-19 12:20:47 +0000
@@ -30,6 +30,7 @@
30 struct datasink_struct *datasink;30 struct datasink_struct *datasink;
31 char *root;31 char *root;
32 void *ptr;32 void *ptr;
33 struct ds_ctxt *pipe_ctxt;
33} ds_ctxt_t;34} ds_ctxt_t;
3435
35typedef struct {36typedef struct {
3637
=== added file 'src/decompress.c'
--- src/decompress.c 1970-01-01 00:00:00 +0000
+++ src/decompress.c 2012-08-19 12:20:47 +0000
@@ -0,0 +1,453 @@
1/******************************************************
2Copyright (c) 2011 Percona Inc.
3
4Compressing datasink implementation for XtraBackup.
5
6This program is free software; you can redistribute it and/or modify
7it under the terms of the GNU General Public License as published by
8the Free Software Foundation; version 2 of the License.
9
10This program is distributed in the hope that it will be useful,
11but WITHOUT ANY WARRANTY; without even the implied warranty of
12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13GNU General Public License for more details.
14
15You should have received a copy of the GNU General Public License
16along with this program; if not, write to the Free Software
17Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
19*******************************************************/
20
21
22#include <my_base.h>
23#include <quicklz.h>
24#include <univ.i>
25#include <zlib.h>
26#include "common.h"
27#include "datasink.h"
28
29#define COMPRESS_CHUNK_SIZE (64 * 1024UL)
30#define MY_QLZ_COMPRESS_OVERHEAD 400
31
32#define DS_DECOMPRESS_BUF_SIZE (COMPRESS_CHUNK_SIZE * 10UL)
33
34typedef struct {
35 pthread_t id;
36 pthread_mutex_t data_mutex;
37 pthread_cond_t not_full_cond;
38 pthread_cond_t not_empty_cond;
39 char data[DS_DECOMPRESS_BUF_SIZE];
40 char* rptr;
41 char* wptr;
42 size_t bytes_avail;
43 size_t data_size;
44 my_bool eof;
45 my_bool error;
46 ds_ctxt_t *ds;
47} ds_decomp_ctxt_t;
48
49static
50void *
51deqpress_worker_thread_func(void *arg);
52
53static ds_ctxt_t *decomp_init(const char *root);
54static ds_file_t *decomp_open(ds_ctxt_t *ctxt, const char *path,
55 MY_STAT *mystat);
56static int decomp_write(ds_file_t *file, const void *buf, size_t len);
57static int decomp_close(ds_file_t *file);
58static void decomp_deinit(ds_ctxt_t *ctxt);
59
60datasink_t datasink_decompress = {
61 &decomp_init,
62 &decomp_open,
63 &decomp_write,
64 &decomp_close,
65 &decomp_deinit
66};
67
68static
69ds_ctxt_t *
70decomp_init(const char *root)
71{
72 ds_ctxt_t *ctxt;
73
74 ctxt = (ds_ctxt_t *) my_malloc(sizeof(ds_ctxt_t),
75 MYF(MY_FAE));
76
77 ctxt->ptr = NULL;
78 ctxt->root = my_strdup(root, MYF(MY_FAE));
79 ctxt->datasink = &datasink_decompress;
80
81 return ctxt;
82}
83
84static
85void
86decomp_deinit(ds_ctxt_t *ctxt)
87{
88 MY_FREE(ctxt->root);
89 MY_FREE(ctxt);
90}
91
92static
93ds_file_t *
94decomp_open(ds_ctxt_t *ctxt, const char *path,
95 MY_STAT *mystat __attribute__((__unused__)))
96{
97 ds_decomp_ctxt_t *decomp_ctxt;
98 ds_file_t *file;
99
100 xb_a(ctxt->pipe_ctxt != NULL);
101
102 file = (ds_file_t *) my_malloc(sizeof(ds_file_t) +
103 sizeof(ds_decomp_ctxt_t),
104 MYF(MY_FAE));
105
106 decomp_ctxt = (ds_decomp_ctxt_t *) (file + 1);
107 file->ptr = decomp_ctxt;
108 file->path = my_strdup(path, MYF(MY_FAE));
109
110 decomp_ctxt->rptr =
111 decomp_ctxt->wptr = decomp_ctxt->data;
112 decomp_ctxt->eof = decomp_ctxt->error = FALSE;
113 decomp_ctxt->bytes_avail = 0;
114 decomp_ctxt->data_size = DS_DECOMPRESS_BUF_SIZE;
115 decomp_ctxt->ds = (ds_ctxt_t *) ctxt->pipe_ctxt;
116
117 pthread_mutex_init(&decomp_ctxt->data_mutex, NULL);
118 pthread_cond_init(&decomp_ctxt->not_full_cond, NULL);
119 pthread_cond_init(&decomp_ctxt->not_empty_cond, NULL);
120
121 if (pthread_create(&decomp_ctxt->id, NULL,
122 deqpress_worker_thread_func, file)) {
123 msg("compress: pthread_create() failed: "
124 "errno = %d\n", errno);
125 return NULL;
126 }
127
128 return file;
129}
130
131static
132int
133decomp_write(ds_file_t *file, const void *buf, size_t len)
134{
135 ds_decomp_ctxt_t *decomp_ctxt;
136 const char *ptr = (const char*) buf;
137
138 decomp_ctxt = (ds_decomp_ctxt_t *) file->ptr;
139
140 while (len > 0) {
141
142 size_t n;
143
144 pthread_mutex_lock(&decomp_ctxt->data_mutex);
145
146 while ((decomp_ctxt->bytes_avail
147 == decomp_ctxt->data_size) &&
148 !decomp_ctxt->error) {
149 pthread_cond_wait(
150 &decomp_ctxt->not_full_cond,
151 &decomp_ctxt->data_mutex);
152 }
153
154 if (decomp_ctxt->error) {
155 pthread_mutex_unlock(&decomp_ctxt->data_mutex);
156 return 1;
157 }
158
159 n = len < (decomp_ctxt->data_size - decomp_ctxt->bytes_avail) ?
160 len : (decomp_ctxt->data_size - decomp_ctxt->bytes_avail);
161
162 if (decomp_ctxt->wptr + n >
163 decomp_ctxt->data + decomp_ctxt->data_size) {
164 n = decomp_ctxt->data + decomp_ctxt->data_size -
165 decomp_ctxt->wptr;
166 }
167
168 memcpy(decomp_ctxt->wptr, ptr, n);
169
170 len -= n;
171 ptr += n;
172 decomp_ctxt->wptr += n;
173 decomp_ctxt->bytes_avail += n;
174
175 if (decomp_ctxt->wptr ==
176 decomp_ctxt->data + decomp_ctxt->data_size) {
177 decomp_ctxt->wptr = decomp_ctxt->data;
178 }
179
180 if (decomp_ctxt->bytes_avail > decomp_ctxt->data_size / 5)
181 pthread_cond_signal(&decomp_ctxt->not_empty_cond);
182
183 pthread_mutex_unlock(&decomp_ctxt->data_mutex);
184
185 }
186
187 return 0;
188}
189
190static
191my_bool
192decomp_read(ds_file_t *file, const void *buf, size_t len)
193{
194 ds_decomp_ctxt_t *decomp_ctxt;
195 char *ptr = (char*) buf;
196
197 decomp_ctxt = (ds_decomp_ctxt_t *) file->ptr;
198
199 while (len > 0) {
200
201 size_t n;
202
203 pthread_mutex_lock(&decomp_ctxt->data_mutex);
204
205 while (decomp_ctxt->bytes_avail == 0 &&
206 !decomp_ctxt->eof) {
207 pthread_cond_wait(
208 &decomp_ctxt->not_empty_cond,
209 &decomp_ctxt->data_mutex);
210 }
211
212 if (decomp_ctxt->bytes_avail == 0 &&
213 decomp_ctxt->eof) {
214 pthread_mutex_unlock(&decomp_ctxt->data_mutex);
215 return FALSE;
216 }
217
218 n = len < decomp_ctxt->bytes_avail ?
219 len : decomp_ctxt->bytes_avail;
220
221
222 if (decomp_ctxt->rptr + n >
223 decomp_ctxt->data + decomp_ctxt->data_size) {
224 n = decomp_ctxt->data + decomp_ctxt->data_size -
225 decomp_ctxt->rptr;
226 }
227
228 if (ptr) {
229 memcpy(ptr, decomp_ctxt->rptr, n);
230 ptr += n;
231 }
232
233 len -= n;
234 decomp_ctxt->rptr += n;
235 decomp_ctxt->bytes_avail -= n;
236
237 if (decomp_ctxt->rptr ==
238 decomp_ctxt->data + decomp_ctxt->data_size) {
239 decomp_ctxt->rptr = decomp_ctxt->data;
240 }
241
242 pthread_cond_signal(&decomp_ctxt->not_full_cond);
243
244 pthread_mutex_unlock(&decomp_ctxt->data_mutex);
245
246 }
247
248 return TRUE;
249}
250
251static
252my_bool
253decomp_is_eof(ds_file_t *file)
254{
255 return ((ds_decomp_ctxt_t *) file->ptr)->eof;
256}
257
258static
259int
260decomp_close(ds_file_t *file)
261{
262 ds_decomp_ctxt_t *decomp_ctxt;
263
264 decomp_ctxt = (ds_decomp_ctxt_t *) file->ptr;
265
266 pthread_mutex_lock(&decomp_ctxt->data_mutex);
267
268 decomp_ctxt->eof = TRUE;
269
270 pthread_cond_broadcast(&decomp_ctxt->not_empty_cond);
271
272 pthread_mutex_unlock(&decomp_ctxt->data_mutex);
273
274 pthread_join(decomp_ctxt->id, NULL);
275
276 pthread_mutex_destroy(&decomp_ctxt->data_mutex);
277 pthread_cond_destroy(&decomp_ctxt->not_full_cond);
278 pthread_cond_destroy(&decomp_ctxt->not_empty_cond);
279
280 MY_FREE(file->path);
281 MY_FREE(file);
282
283 return 0;
284}
285
286static
287void
288decomp_set_error(ds_file_t *file)
289{
290 ds_decomp_ctxt_t *decomp_ctxt;
291
292 decomp_ctxt = (ds_decomp_ctxt_t *) file->ptr;
293
294 pthread_mutex_lock(&decomp_ctxt->data_mutex);
295
296 decomp_ctxt->error = TRUE;
297
298 pthread_cond_broadcast(&decomp_ctxt->not_full_cond);
299
300 pthread_mutex_unlock(&decomp_ctxt->data_mutex);
301}
302
303#define DECOMP_READ(file, buf, len) \
304 if (!decomp_read(file, buf, len)) { \
305 msg("decompress: error read %s\n", file->path); \
306 goto error; \
307 }
308
309
310static
311void *
312deqpress_worker_thread_func(void *arg)
313{
314 ds_file_t *file = (ds_file_t *) arg;
315 ds_ctxt_t *ds = ((ds_decomp_ctxt_t *) file->ptr)->ds;
316 char *comp_chunk = NULL, *decomp_chunk = NULL;
317 char tmp[10];
318 ds_file_t *arcfile = NULL;
319 ulong filename_len;
320 char filename[FN_REFLEN];
321 char dirname[FN_REFLEN];
322 size_t dirpath_len;
323
324 my_thread_init();
325
326 DECOMP_READ(file, tmp, 8);
327
328 if (strncmp(tmp, "qpress10", 8) != 0) {
329 msg("decompress: source was not compressed with qpress %s\n",
330 file->path);
331 goto error;
332 }
333
334 DECOMP_READ(file, tmp, 8);
335
336 ulonglong compress_chunk_size = uint8korr(tmp);
337 if (compress_chunk_size > 512*1024*1024) {
338 msg("decompress: source is corrupted: %s\n", file->path);
339 goto error;
340 }
341
342 DECOMP_READ(file, tmp, 1);
343 if (tmp[0] != 'F') {
344 msg("decompress: %s: xbtream can decompress only single-file "
345 "qpress archives for now\n", file->path);
346 goto error;
347 }
348
349 DECOMP_READ(file, tmp, 4);
350
351 filename_len = uint4korr(tmp);
352
353 dirname_part(dirname, file->path, &dirpath_len);
354 char *filename_short = my_malloc(filename_len + 1, MYF(MY_FAE));
355 DECOMP_READ(file, filename_short, filename_len + 1);
356 snprintf(filename, FN_REFLEN,
357 "%s/%s/%s", ds->root, dirname, filename_short);
358 MY_FREE(filename_short);
359
360 MY_STAT mystat;
361 arcfile = ds->datasink->open(ds, filename, &mystat);
362 if (arcfile == NULL) {
363 msg("decompress: failed to create file %s.\n", filename);
364 goto error;
365 }
366
367 comp_chunk = my_malloc((compress_chunk_size +
368 MY_QLZ_COMPRESS_OVERHEAD) * 2, MYF(MY_FAE));
369 decomp_chunk = comp_chunk + compress_chunk_size + MY_QLZ_COMPRESS_OVERHEAD;
370 qlz_state_decompress state;
371
372 while (1) {
373 DECOMP_READ(file, tmp, 8);
374
375 if (strncmp(tmp, "NEWBNEWB", 8) == 0) {
376 DECOMP_READ(file, tmp, 8);
377 DECOMP_READ(file, tmp, 4);
378
379 ulong adler = uint4korr(tmp);
380
381 DECOMP_READ(file, comp_chunk, 9);
382
383 size_t size_compressed =
384 qlz_size_compressed(comp_chunk);
385 if (size_compressed > compress_chunk_size +
386 MY_QLZ_COMPRESS_OVERHEAD) {
387 msg("decompress: chunk is too large %s\n",
388 file->path);
389 goto error;
390 }
391
392 DECOMP_READ(file, comp_chunk + 9, size_compressed - 9);
393
394 ulint adler_read = adler32(0x00000001, comp_chunk,
395 size_compressed);
396 size_t size_decompressed = qlz_decompress(comp_chunk,
397 decomp_chunk, &state);
398
399 if (size_decompressed > COMPRESS_CHUNK_SIZE
400 + MY_QLZ_COMPRESS_OVERHEAD) {
401 msg("decompress: buffer overflow\n");
402 goto error;
403 }
404
405 if (adler != adler_read) {
406 msg("decompress: corrupted %s, "
407 "checksum mismatch\n",
408 file->path);
409 goto error;
410 }
411
412 ds->datasink->write(arcfile, decomp_chunk,
413 size_decompressed);
414
415 continue;
416 }
417
418 if (strncmp(tmp, "ENDSENDS", 8) == 0) {
419 DECOMP_READ(file, tmp, 8);
420 break;
421 }
422
423 msg("decompress: corrupted %s\n", file->path);
424 goto error;
425 }
426
427 if (decomp_read(file, tmp, 1)) {
428 msg("decompress: error, EOF expected\n");
429 goto error;
430 }
431
432 if (!decomp_is_eof(file)) {
433 msg("decompress: error, EOF expected\n");
434 goto error;
435 }
436
437 if (arcfile) {
438 ds->datasink->close(arcfile);
439 }
440 MY_FREE(comp_chunk);
441 my_thread_end();
442 return NULL;
443
444error:
445 if (arcfile) {
446 ds->datasink->close(arcfile);
447 }
448 decomp_set_error(file);
449 MY_FREE(comp_chunk);
450 my_thread_end();
451 return NULL;
452
453}
0454
=== added file 'src/decompress.h'
--- src/decompress.h 1970-01-01 00:00:00 +0000
+++ src/decompress.h 2012-08-19 12:20:47 +0000
@@ -0,0 +1,28 @@
1/******************************************************
2Copyright (c) 2011 Percona Inc.
3
4Compression interface for XtraBackup.
5
6This program is free software; you can redistribute it and/or modify
7it under the terms of the GNU General Public License as published by
8the Free Software Foundation; version 2 of the License.
9
10This program is distributed in the hope that it will be useful,
11but WITHOUT ANY WARRANTY; without even the implied warranty of
12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13GNU General Public License for more details.
14
15You should have received a copy of the GNU General Public License
16along with this program; if not, write to the Free Software
17Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
19*******************************************************/
20
21#ifndef DS_DEDSCOMPRESS_H
22#define DS_DEDSCOMPRESS_H
23
24#include "datasink.h"
25
26extern datasink_t datasink_decompress;
27
28#endif
029
=== modified file 'src/xbstream.c'
--- src/xbstream.c 2012-02-10 20:05:56 +0000
+++ src/xbstream.c 2012-08-19 12:20:47 +0000
@@ -24,6 +24,7 @@
24#include "common.h"24#include "common.h"
25#include "xbstream.h"25#include "xbstream.h"
26#include "local.h"26#include "local.h"
27#include "decompress.h"
2728
28#define XBSTREAM_VERSION "1.0"29#define XBSTREAM_VERSION "1.0"
29#define XBSTREAM_BUFFER_SIZE (1024 * 1024UL)30#define XBSTREAM_BUFFER_SIZE (1024 * 1024UL)
@@ -39,6 +40,7 @@
39static run_mode_t opt_mode;40static run_mode_t opt_mode;
40static char * opt_directory = NULL;41static char * opt_directory = NULL;
41static my_bool opt_verbose = 0;42static my_bool opt_verbose = 0;
43static my_bool opt_decompress = 0;
4244
43static struct my_option my_long_options[] =45static struct my_option my_long_options[] =
44{46{
@@ -54,6 +56,9 @@
54 GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},56 GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
55 {"verbose", 'v', "Print verbose output.", &opt_verbose, &opt_verbose,57 {"verbose", 'v', "Print verbose output.", &opt_verbose, &opt_verbose,
56 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},58 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
59 {"decompress", 'z', "Decompress compressed files.",
60 &opt_decompress, &opt_decompress,
61 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
5762
58 {0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}63 {0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
59};64};
@@ -362,7 +367,9 @@
362 HASH filehash;367 HASH filehash;
363 file_entry_t *entry;368 file_entry_t *entry;
364 datasink_t *ds;369 datasink_t *ds;
370 datasink_t *ds_decomp;
365 ds_ctxt_t *ds_ctxt;371 ds_ctxt_t *ds_ctxt;
372 ds_ctxt_t *ds_ctxt_decomp;
366373
367 stream = xb_stream_read_new();374 stream = xb_stream_read_new();
368 if (stream == NULL) {375 if (stream == NULL) {
@@ -372,7 +379,10 @@
372379
373 /* If --directory is specified, it is already set as CWD by now. */380 /* If --directory is specified, it is already set as CWD by now. */
374 ds = &datasink_local;381 ds = &datasink_local;
382 ds_decomp = &datasink_decompress;
375 ds_ctxt = ds->init(".");383 ds_ctxt = ds->init(".");
384 ds_ctxt_decomp = ds_decomp->init(".");
385 ds_ctxt_decomp->pipe_ctxt = ds_ctxt;
376386
377 if (my_hash_init(&filehash, &my_charset_bin, START_FILE_HASH_SIZE,387 if (my_hash_init(&filehash, &my_charset_bin, START_FILE_HASH_SIZE,
378 0, 0, (my_hash_get_key) get_file_entry_key,388 0, 0, (my_hash_get_key) get_file_entry_key,
@@ -395,8 +405,18 @@
395 chunk.pathlen);405 chunk.pathlen);
396406
397 if (entry == NULL) {407 if (entry == NULL) {
398 entry = file_entry_new(ds_ctxt, chunk.path,408 if (opt_decompress &&
399 chunk.pathlen);409 strlen(chunk.path) > 3 &&
410 strcmp(chunk.path + strlen(chunk.path) - 3,
411 ".qp") == 0 &&
412 memcmp(chunk.data, "qpress10", 8) == 0) {
413 entry = file_entry_new(ds_ctxt_decomp, chunk.path,
414 chunk.pathlen);
415 } else {
416 entry = file_entry_new(ds_ctxt, chunk.path,
417 chunk.pathlen);
418
419 }
400 if (entry == NULL) {420 if (entry == NULL) {
401 goto err;421 goto err;
402 }422 }
@@ -420,7 +440,7 @@
420 goto err;440 goto err;
421 }441 }
422442
423 if (ds->write(entry->file, chunk.data, chunk.length)) {443 if (entry->ds_ctxt->datasink->write(entry->file, chunk.data, chunk.length)) {
424 msg("%s: my_write() failed.\n", my_progname);444 msg("%s: my_write() failed.\n", my_progname);
425 goto err;445 goto err;
426 }446 }
@@ -434,12 +454,14 @@
434454
435 my_hash_free(&filehash);455 my_hash_free(&filehash);
436 ds->deinit(ds_ctxt);456 ds->deinit(ds_ctxt);
457 ds_decomp->deinit(ds_ctxt_decomp);
437 xb_stream_read_done(stream);458 xb_stream_read_done(stream);
438459
439 return 0;460 return 0;
440err:461err:
441 my_hash_free(&filehash);462 my_hash_free(&filehash);
442 ds->deinit(ds_ctxt);463 ds->deinit(ds_ctxt);
464 ds_decomp->deinit(ds_ctxt_decomp);
443 xb_stream_read_done(stream);465 xb_stream_read_done(stream);
444466
445 return 1;467 return 1;
446468
=== added file 'test/t/ib_stream_decompress.sh'
--- test/t/ib_stream_decompress.sh 1970-01-01 00:00:00 +0000
+++ test/t/ib_stream_decompress.sh 2012-08-19 12:20:47 +0000
@@ -0,0 +1,9 @@
1############################################################################
2# Test streaming + compression (decompression with xbstream)
3############################################################################
4
5stream_format=xbstream
6stream_extract_cmd="xbstream -xvz <"
7innobackupex_options="--compress --compress-threads=4 --parallel=4"
8
9. inc/ib_stream_common.sh

Subscribers

People subscribed via source and target branches