Merge lp:~sergei.glushchenko/percona-xtrabackup/20-decompress into lp:percona-xtrabackup/2.0

Proposed by Sergei Glushchenko
Status: Rejected
Rejected by: Alexey Kopytov
Proposed branch: lp:~sergei.glushchenko/percona-xtrabackup/20-decompress
Merge into: lp:percona-xtrabackup/2.0
Diff against target: 634 lines (+518/-5)
6 files modified
src/Makefile (+2/-2)
src/datasink.h (+1/-0)
src/decompress.c (+453/-0)
src/decompress.h (+28/-0)
src/xbstream.c (+25/-3)
test/t/ib_stream_decompress.sh (+9/-0)
To merge this branch: bzr merge lp:~sergei.glushchenko/percona-xtrabackup/20-decompress
Reviewer Review Type Date Requested Status
Alexey Kopytov (community) Disapprove
Review via email: mp+120287@code.launchpad.net

Description of the change

BP https://blueprints.launchpad.net/percona-xtrabackup/+spec/xbstream-decompress
  Support for decompression in xbstream. New option -z been added to
  automatically decompress .qp files inside the stream. decompress datasink
  been implemented for qpress10 single file archives extraction.

#24660

To post a comment you must log in.
Revision history for this message
Sergei Glushchenko (sergei.glushchenko) wrote :
Revision history for this message
Stewart Smith (stewart) wrote :

I have three thoughts:
- Should we add this to 2.0 at all? It is adding a feature and a 2.1 release probably isn't far off.
- I think that certainly in 2.1, this should be default behaviour, as it's almost certainly what the user wanted.
- parallel decompression?

Revision history for this message
Sergei Glushchenko (sergei.glushchenko) wrote :

> I have three thoughts:
> - Should we add this to 2.0 at all? It is adding a feature and a 2.1 release
> probably isn't far off.

We have customer using 2.0, who want this feature.

> - I think that certainly in 2.1, this should be default behaviour, as it's
> almost certainly what the user wanted.

Yes, probably you're right.

> - parallel decompression?

I've experimented with parallel decompression, implementing it similar to parallel compression and it didn't gave me any performance gain. It was even slower than thread-per-file implementation. Do you have any specific proposals how to implement parallel decompression?

Revision history for this message
Stewart Smith (stewart) wrote :

> > - parallel decompression?
>
> I've experimented with parallel decompression, implementing it similar to
> parallel compression and it didn't gave me any performance gain. It was even
> slower than thread-per-file implementation. Do you have any specific proposals
> how to implement parallel decompression?

Thread per file is fine.

Revision history for this message
Alexey Kopytov (akopytov) wrote :

As discussed on IRC, the feature will be implemented by adding a --decompress-program option to xbstream so that an external utility such as qpress can be used.

Revision history for this message
Alexey Kopytov (akopytov) wrote :

After further analysis and discussions it was decided that this feature does not really have real applications, while complicating the xbstream architecture significantly.

Users willing to utilize compression only to speed up transferring the entire archive to another host do not really need built-in compression in XtraBackup. That feature was implemented for different use cases. Instead, users can use normal uncompressed stream in XtraBackup (either tar, or xbstream in case of parallel or incremental backups) and then use external compression utilities (either single- or multi-threaded) to compress the stream and decompress it on remote host.

review: Disapprove

Unmerged revisions

462. By Sergei Glushchenko

BP https://blueprints.launchpad.net/percona-xtrabackup/+spec/xbstream-decompress
Support for decompression in xbstream. New option -z been added to
automatically decompress .qp files inside the stream. decompress datasink
been implemented for qpress10 one-file archives extraction.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/Makefile'
2--- src/Makefile 2012-02-10 20:05:56 +0000
3+++ src/Makefile 2012-08-19 12:20:47 +0000
4@@ -24,7 +24,7 @@
5
6 COMMON_INC = -I. -I libarchive/libarchive -I quicklz
7 XTRABACKUPOBJS = xtrabackup.o stream.o local.o compress.o xbstream_write.o \
8- quicklz/quicklz.o
9+ quicklz/quicklz.o decompress.o
10 XBSTREAMOBJS = xbstream.o xbstream_write.o xbstream_read.o
11
12 LIBARCHIVE_A = libarchive/libarchive/libarchive.a
13@@ -176,7 +176,7 @@
14 xbstream.o xbstream_read.o: %.o: %.c
15 $(CC) $(CFLAGS) $(INC) $(DEFS) -c $< -o $@
16
17-xbstream: $(XBSTREAMOBJS) $(MYSQLOBJS) local.o
18+xbstream: $(XBSTREAMOBJS) $(MYSQLOBJS) local.o decompress.o quicklz/quicklz.o
19 $(CC) $(CFLAGS) $^ $(INC) $(MYSQLOBJS) $(LIBS) -o $@
20
21 xtrabackup.o: xtrabackup.c xb_regex.h
22
23=== modified file 'src/datasink.h'
24--- src/datasink.h 2012-02-10 20:05:56 +0000
25+++ src/datasink.h 2012-08-19 12:20:47 +0000
26@@ -30,6 +30,7 @@
27 struct datasink_struct *datasink;
28 char *root;
29 void *ptr;
30+ struct ds_ctxt *pipe_ctxt;
31 } ds_ctxt_t;
32
33 typedef struct {
34
35=== added file 'src/decompress.c'
36--- src/decompress.c 1970-01-01 00:00:00 +0000
37+++ src/decompress.c 2012-08-19 12:20:47 +0000
38@@ -0,0 +1,453 @@
39+/******************************************************
40+Copyright (c) 2011 Percona Inc.
41+
42+Compressing datasink implementation for XtraBackup.
43+
44+This program is free software; you can redistribute it and/or modify
45+it under the terms of the GNU General Public License as published by
46+the Free Software Foundation; version 2 of the License.
47+
48+This program is distributed in the hope that it will be useful,
49+but WITHOUT ANY WARRANTY; without even the implied warranty of
50+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
51+GNU General Public License for more details.
52+
53+You should have received a copy of the GNU General Public License
54+along with this program; if not, write to the Free Software
55+Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
56+
57+*******************************************************/
58+
59+
60+#include <my_base.h>
61+#include <quicklz.h>
62+#include <univ.i>
63+#include <zlib.h>
64+#include "common.h"
65+#include "datasink.h"
66+
67+#define COMPRESS_CHUNK_SIZE (64 * 1024UL)
68+#define MY_QLZ_COMPRESS_OVERHEAD 400
69+
70+#define DS_DECOMPRESS_BUF_SIZE (COMPRESS_CHUNK_SIZE * 10UL)
71+
72+typedef struct {
73+ pthread_t id;
74+ pthread_mutex_t data_mutex;
75+ pthread_cond_t not_full_cond;
76+ pthread_cond_t not_empty_cond;
77+ char data[DS_DECOMPRESS_BUF_SIZE];
78+ char* rptr;
79+ char* wptr;
80+ size_t bytes_avail;
81+ size_t data_size;
82+ my_bool eof;
83+ my_bool error;
84+ ds_ctxt_t *ds;
85+} ds_decomp_ctxt_t;
86+
87+static
88+void *
89+deqpress_worker_thread_func(void *arg);
90+
91+static ds_ctxt_t *decomp_init(const char *root);
92+static ds_file_t *decomp_open(ds_ctxt_t *ctxt, const char *path,
93+ MY_STAT *mystat);
94+static int decomp_write(ds_file_t *file, const void *buf, size_t len);
95+static int decomp_close(ds_file_t *file);
96+static void decomp_deinit(ds_ctxt_t *ctxt);
97+
98+datasink_t datasink_decompress = {
99+ &decomp_init,
100+ &decomp_open,
101+ &decomp_write,
102+ &decomp_close,
103+ &decomp_deinit
104+};
105+
106+static
107+ds_ctxt_t *
108+decomp_init(const char *root)
109+{
110+ ds_ctxt_t *ctxt;
111+
112+ ctxt = (ds_ctxt_t *) my_malloc(sizeof(ds_ctxt_t),
113+ MYF(MY_FAE));
114+
115+ ctxt->ptr = NULL;
116+ ctxt->root = my_strdup(root, MYF(MY_FAE));
117+ ctxt->datasink = &datasink_decompress;
118+
119+ return ctxt;
120+}
121+
122+static
123+void
124+decomp_deinit(ds_ctxt_t *ctxt)
125+{
126+ MY_FREE(ctxt->root);
127+ MY_FREE(ctxt);
128+}
129+
130+static
131+ds_file_t *
132+decomp_open(ds_ctxt_t *ctxt, const char *path,
133+ MY_STAT *mystat __attribute__((__unused__)))
134+{
135+ ds_decomp_ctxt_t *decomp_ctxt;
136+ ds_file_t *file;
137+
138+ xb_a(ctxt->pipe_ctxt != NULL);
139+
140+ file = (ds_file_t *) my_malloc(sizeof(ds_file_t) +
141+ sizeof(ds_decomp_ctxt_t),
142+ MYF(MY_FAE));
143+
144+ decomp_ctxt = (ds_decomp_ctxt_t *) (file + 1);
145+ file->ptr = decomp_ctxt;
146+ file->path = my_strdup(path, MYF(MY_FAE));
147+
148+ decomp_ctxt->rptr =
149+ decomp_ctxt->wptr = decomp_ctxt->data;
150+ decomp_ctxt->eof = decomp_ctxt->error = FALSE;
151+ decomp_ctxt->bytes_avail = 0;
152+ decomp_ctxt->data_size = DS_DECOMPRESS_BUF_SIZE;
153+ decomp_ctxt->ds = (ds_ctxt_t *) ctxt->pipe_ctxt;
154+
155+ pthread_mutex_init(&decomp_ctxt->data_mutex, NULL);
156+ pthread_cond_init(&decomp_ctxt->not_full_cond, NULL);
157+ pthread_cond_init(&decomp_ctxt->not_empty_cond, NULL);
158+
159+ if (pthread_create(&decomp_ctxt->id, NULL,
160+ deqpress_worker_thread_func, file)) {
161+ msg("compress: pthread_create() failed: "
162+ "errno = %d\n", errno);
163+ return NULL;
164+ }
165+
166+ return file;
167+}
168+
169+static
170+int
171+decomp_write(ds_file_t *file, const void *buf, size_t len)
172+{
173+ ds_decomp_ctxt_t *decomp_ctxt;
174+ const char *ptr = (const char*) buf;
175+
176+ decomp_ctxt = (ds_decomp_ctxt_t *) file->ptr;
177+
178+ while (len > 0) {
179+
180+ size_t n;
181+
182+ pthread_mutex_lock(&decomp_ctxt->data_mutex);
183+
184+ while ((decomp_ctxt->bytes_avail
185+ == decomp_ctxt->data_size) &&
186+ !decomp_ctxt->error) {
187+ pthread_cond_wait(
188+ &decomp_ctxt->not_full_cond,
189+ &decomp_ctxt->data_mutex);
190+ }
191+
192+ if (decomp_ctxt->error) {
193+ pthread_mutex_unlock(&decomp_ctxt->data_mutex);
194+ return 1;
195+ }
196+
197+ n = len < (decomp_ctxt->data_size - decomp_ctxt->bytes_avail) ?
198+ len : (decomp_ctxt->data_size - decomp_ctxt->bytes_avail);
199+
200+ if (decomp_ctxt->wptr + n >
201+ decomp_ctxt->data + decomp_ctxt->data_size) {
202+ n = decomp_ctxt->data + decomp_ctxt->data_size -
203+ decomp_ctxt->wptr;
204+ }
205+
206+ memcpy(decomp_ctxt->wptr, ptr, n);
207+
208+ len -= n;
209+ ptr += n;
210+ decomp_ctxt->wptr += n;
211+ decomp_ctxt->bytes_avail += n;
212+
213+ if (decomp_ctxt->wptr ==
214+ decomp_ctxt->data + decomp_ctxt->data_size) {
215+ decomp_ctxt->wptr = decomp_ctxt->data;
216+ }
217+
218+ if (decomp_ctxt->bytes_avail > decomp_ctxt->data_size / 5)
219+ pthread_cond_signal(&decomp_ctxt->not_empty_cond);
220+
221+ pthread_mutex_unlock(&decomp_ctxt->data_mutex);
222+
223+ }
224+
225+ return 0;
226+}
227+
228+static
229+my_bool
230+decomp_read(ds_file_t *file, const void *buf, size_t len)
231+{
232+ ds_decomp_ctxt_t *decomp_ctxt;
233+ char *ptr = (char*) buf;
234+
235+ decomp_ctxt = (ds_decomp_ctxt_t *) file->ptr;
236+
237+ while (len > 0) {
238+
239+ size_t n;
240+
241+ pthread_mutex_lock(&decomp_ctxt->data_mutex);
242+
243+ while (decomp_ctxt->bytes_avail == 0 &&
244+ !decomp_ctxt->eof) {
245+ pthread_cond_wait(
246+ &decomp_ctxt->not_empty_cond,
247+ &decomp_ctxt->data_mutex);
248+ }
249+
250+ if (decomp_ctxt->bytes_avail == 0 &&
251+ decomp_ctxt->eof) {
252+ pthread_mutex_unlock(&decomp_ctxt->data_mutex);
253+ return FALSE;
254+ }
255+
256+ n = len < decomp_ctxt->bytes_avail ?
257+ len : decomp_ctxt->bytes_avail;
258+
259+
260+ if (decomp_ctxt->rptr + n >
261+ decomp_ctxt->data + decomp_ctxt->data_size) {
262+ n = decomp_ctxt->data + decomp_ctxt->data_size -
263+ decomp_ctxt->rptr;
264+ }
265+
266+ if (ptr) {
267+ memcpy(ptr, decomp_ctxt->rptr, n);
268+ ptr += n;
269+ }
270+
271+ len -= n;
272+ decomp_ctxt->rptr += n;
273+ decomp_ctxt->bytes_avail -= n;
274+
275+ if (decomp_ctxt->rptr ==
276+ decomp_ctxt->data + decomp_ctxt->data_size) {
277+ decomp_ctxt->rptr = decomp_ctxt->data;
278+ }
279+
280+ pthread_cond_signal(&decomp_ctxt->not_full_cond);
281+
282+ pthread_mutex_unlock(&decomp_ctxt->data_mutex);
283+
284+ }
285+
286+ return TRUE;
287+}
288+
289+static
290+my_bool
291+decomp_is_eof(ds_file_t *file)
292+{
293+ return ((ds_decomp_ctxt_t *) file->ptr)->eof;
294+}
295+
296+static
297+int
298+decomp_close(ds_file_t *file)
299+{
300+ ds_decomp_ctxt_t *decomp_ctxt;
301+
302+ decomp_ctxt = (ds_decomp_ctxt_t *) file->ptr;
303+
304+ pthread_mutex_lock(&decomp_ctxt->data_mutex);
305+
306+ decomp_ctxt->eof = TRUE;
307+
308+ pthread_cond_broadcast(&decomp_ctxt->not_empty_cond);
309+
310+ pthread_mutex_unlock(&decomp_ctxt->data_mutex);
311+
312+ pthread_join(decomp_ctxt->id, NULL);
313+
314+ pthread_mutex_destroy(&decomp_ctxt->data_mutex);
315+ pthread_cond_destroy(&decomp_ctxt->not_full_cond);
316+ pthread_cond_destroy(&decomp_ctxt->not_empty_cond);
317+
318+ MY_FREE(file->path);
319+ MY_FREE(file);
320+
321+ return 0;
322+}
323+
324+static
325+void
326+decomp_set_error(ds_file_t *file)
327+{
328+ ds_decomp_ctxt_t *decomp_ctxt;
329+
330+ decomp_ctxt = (ds_decomp_ctxt_t *) file->ptr;
331+
332+ pthread_mutex_lock(&decomp_ctxt->data_mutex);
333+
334+ decomp_ctxt->error = TRUE;
335+
336+ pthread_cond_broadcast(&decomp_ctxt->not_full_cond);
337+
338+ pthread_mutex_unlock(&decomp_ctxt->data_mutex);
339+}
340+
341+#define DECOMP_READ(file, buf, len) \
342+ if (!decomp_read(file, buf, len)) { \
343+ msg("decompress: error read %s\n", file->path); \
344+ goto error; \
345+ }
346+
347+
348+static
349+void *
350+deqpress_worker_thread_func(void *arg)
351+{
352+ ds_file_t *file = (ds_file_t *) arg;
353+ ds_ctxt_t *ds = ((ds_decomp_ctxt_t *) file->ptr)->ds;
354+ char *comp_chunk = NULL, *decomp_chunk = NULL;
355+ char tmp[10];
356+ ds_file_t *arcfile = NULL;
357+ ulong filename_len;
358+ char filename[FN_REFLEN];
359+ char dirname[FN_REFLEN];
360+ size_t dirpath_len;
361+
362+ my_thread_init();
363+
364+ DECOMP_READ(file, tmp, 8);
365+
366+ if (strncmp(tmp, "qpress10", 8) != 0) {
367+ msg("decompress: source was not compressed with qpress %s\n",
368+ file->path);
369+ goto error;
370+ }
371+
372+ DECOMP_READ(file, tmp, 8);
373+
374+ ulonglong compress_chunk_size = uint8korr(tmp);
375+ if (compress_chunk_size > 512*1024*1024) {
376+ msg("decompress: source is corrupted: %s\n", file->path);
377+ goto error;
378+ }
379+
380+ DECOMP_READ(file, tmp, 1);
381+ if (tmp[0] != 'F') {
382+ msg("decompress: %s: xbtream can decompress only single-file "
383+ "qpress archives for now\n", file->path);
384+ goto error;
385+ }
386+
387+ DECOMP_READ(file, tmp, 4);
388+
389+ filename_len = uint4korr(tmp);
390+
391+ dirname_part(dirname, file->path, &dirpath_len);
392+ char *filename_short = my_malloc(filename_len + 1, MYF(MY_FAE));
393+ DECOMP_READ(file, filename_short, filename_len + 1);
394+ snprintf(filename, FN_REFLEN,
395+ "%s/%s/%s", ds->root, dirname, filename_short);
396+ MY_FREE(filename_short);
397+
398+ MY_STAT mystat;
399+ arcfile = ds->datasink->open(ds, filename, &mystat);
400+ if (arcfile == NULL) {
401+ msg("decompress: failed to create file %s.\n", filename);
402+ goto error;
403+ }
404+
405+ comp_chunk = my_malloc((compress_chunk_size +
406+ MY_QLZ_COMPRESS_OVERHEAD) * 2, MYF(MY_FAE));
407+ decomp_chunk = comp_chunk + compress_chunk_size + MY_QLZ_COMPRESS_OVERHEAD;
408+ qlz_state_decompress state;
409+
410+ while (1) {
411+ DECOMP_READ(file, tmp, 8);
412+
413+ if (strncmp(tmp, "NEWBNEWB", 8) == 0) {
414+ DECOMP_READ(file, tmp, 8);
415+ DECOMP_READ(file, tmp, 4);
416+
417+ ulong adler = uint4korr(tmp);
418+
419+ DECOMP_READ(file, comp_chunk, 9);
420+
421+ size_t size_compressed =
422+ qlz_size_compressed(comp_chunk);
423+ if (size_compressed > compress_chunk_size +
424+ MY_QLZ_COMPRESS_OVERHEAD) {
425+ msg("decompress: chunk is too large %s\n",
426+ file->path);
427+ goto error;
428+ }
429+
430+ DECOMP_READ(file, comp_chunk + 9, size_compressed - 9);
431+
432+ ulint adler_read = adler32(0x00000001, comp_chunk,
433+ size_compressed);
434+ size_t size_decompressed = qlz_decompress(comp_chunk,
435+ decomp_chunk, &state);
436+
437+ if (size_decompressed > COMPRESS_CHUNK_SIZE
438+ + MY_QLZ_COMPRESS_OVERHEAD) {
439+ msg("decompress: buffer overflow\n");
440+ goto error;
441+ }
442+
443+ if (adler != adler_read) {
444+ msg("decompress: corrupted %s, "
445+ "checksum mismatch\n",
446+ file->path);
447+ goto error;
448+ }
449+
450+ ds->datasink->write(arcfile, decomp_chunk,
451+ size_decompressed);
452+
453+ continue;
454+ }
455+
456+ if (strncmp(tmp, "ENDSENDS", 8) == 0) {
457+ DECOMP_READ(file, tmp, 8);
458+ break;
459+ }
460+
461+ msg("decompress: corrupted %s\n", file->path);
462+ goto error;
463+ }
464+
465+ if (decomp_read(file, tmp, 1)) {
466+ msg("decompress: error, EOF expected\n");
467+ goto error;
468+ }
469+
470+ if (!decomp_is_eof(file)) {
471+ msg("decompress: error, EOF expected\n");
472+ goto error;
473+ }
474+
475+ if (arcfile) {
476+ ds->datasink->close(arcfile);
477+ }
478+ MY_FREE(comp_chunk);
479+ my_thread_end();
480+ return NULL;
481+
482+error:
483+ if (arcfile) {
484+ ds->datasink->close(arcfile);
485+ }
486+ decomp_set_error(file);
487+ MY_FREE(comp_chunk);
488+ my_thread_end();
489+ return NULL;
490+
491+}
492
493=== added file 'src/decompress.h'
494--- src/decompress.h 1970-01-01 00:00:00 +0000
495+++ src/decompress.h 2012-08-19 12:20:47 +0000
496@@ -0,0 +1,28 @@
497+/******************************************************
498+Copyright (c) 2011 Percona Inc.
499+
500+Compression interface for XtraBackup.
501+
502+This program is free software; you can redistribute it and/or modify
503+it under the terms of the GNU General Public License as published by
504+the Free Software Foundation; version 2 of the License.
505+
506+This program is distributed in the hope that it will be useful,
507+but WITHOUT ANY WARRANTY; without even the implied warranty of
508+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
509+GNU General Public License for more details.
510+
511+You should have received a copy of the GNU General Public License
512+along with this program; if not, write to the Free Software
513+Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
514+
515+*******************************************************/
516+
517+#ifndef DS_DEDSCOMPRESS_H
518+#define DS_DEDSCOMPRESS_H
519+
520+#include "datasink.h"
521+
522+extern datasink_t datasink_decompress;
523+
524+#endif
525
526=== modified file 'src/xbstream.c'
527--- src/xbstream.c 2012-02-10 20:05:56 +0000
528+++ src/xbstream.c 2012-08-19 12:20:47 +0000
529@@ -24,6 +24,7 @@
530 #include "common.h"
531 #include "xbstream.h"
532 #include "local.h"
533+#include "decompress.h"
534
535 #define XBSTREAM_VERSION "1.0"
536 #define XBSTREAM_BUFFER_SIZE (1024 * 1024UL)
537@@ -39,6 +40,7 @@
538 static run_mode_t opt_mode;
539 static char * opt_directory = NULL;
540 static my_bool opt_verbose = 0;
541+static my_bool opt_decompress = 0;
542
543 static struct my_option my_long_options[] =
544 {
545@@ -54,6 +56,9 @@
546 GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
547 {"verbose", 'v', "Print verbose output.", &opt_verbose, &opt_verbose,
548 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
549+ {"decompress", 'z', "Decompress compressed files.",
550+ &opt_decompress, &opt_decompress,
551+ 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
552
553 {0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
554 };
555@@ -362,7 +367,9 @@
556 HASH filehash;
557 file_entry_t *entry;
558 datasink_t *ds;
559+ datasink_t *ds_decomp;
560 ds_ctxt_t *ds_ctxt;
561+ ds_ctxt_t *ds_ctxt_decomp;
562
563 stream = xb_stream_read_new();
564 if (stream == NULL) {
565@@ -372,7 +379,10 @@
566
567 /* If --directory is specified, it is already set as CWD by now. */
568 ds = &datasink_local;
569+ ds_decomp = &datasink_decompress;
570 ds_ctxt = ds->init(".");
571+ ds_ctxt_decomp = ds_decomp->init(".");
572+ ds_ctxt_decomp->pipe_ctxt = ds_ctxt;
573
574 if (my_hash_init(&filehash, &my_charset_bin, START_FILE_HASH_SIZE,
575 0, 0, (my_hash_get_key) get_file_entry_key,
576@@ -395,8 +405,18 @@
577 chunk.pathlen);
578
579 if (entry == NULL) {
580- entry = file_entry_new(ds_ctxt, chunk.path,
581- chunk.pathlen);
582+ if (opt_decompress &&
583+ strlen(chunk.path) > 3 &&
584+ strcmp(chunk.path + strlen(chunk.path) - 3,
585+ ".qp") == 0 &&
586+ memcmp(chunk.data, "qpress10", 8) == 0) {
587+ entry = file_entry_new(ds_ctxt_decomp, chunk.path,
588+ chunk.pathlen);
589+ } else {
590+ entry = file_entry_new(ds_ctxt, chunk.path,
591+ chunk.pathlen);
592+
593+ }
594 if (entry == NULL) {
595 goto err;
596 }
597@@ -420,7 +440,7 @@
598 goto err;
599 }
600
601- if (ds->write(entry->file, chunk.data, chunk.length)) {
602+ if (entry->ds_ctxt->datasink->write(entry->file, chunk.data, chunk.length)) {
603 msg("%s: my_write() failed.\n", my_progname);
604 goto err;
605 }
606@@ -434,12 +454,14 @@
607
608 my_hash_free(&filehash);
609 ds->deinit(ds_ctxt);
610+ ds_decomp->deinit(ds_ctxt_decomp);
611 xb_stream_read_done(stream);
612
613 return 0;
614 err:
615 my_hash_free(&filehash);
616 ds->deinit(ds_ctxt);
617+ ds_decomp->deinit(ds_ctxt_decomp);
618 xb_stream_read_done(stream);
619
620 return 1;
621
622=== added file 'test/t/ib_stream_decompress.sh'
623--- test/t/ib_stream_decompress.sh 1970-01-01 00:00:00 +0000
624+++ test/t/ib_stream_decompress.sh 2012-08-19 12:20:47 +0000
625@@ -0,0 +1,9 @@
626+############################################################################
627+# Test streaming + compression (decompression with xbstream)
628+############################################################################
629+
630+stream_format=xbstream
631+stream_extract_cmd="xbstream -xvz <"
632+innobackupex_options="--compress --compress-threads=4 --parallel=4"
633+
634+. inc/ib_stream_common.sh

Subscribers

People subscribed via source and target branches