Merge lp:~sergei.glushchenko/percona-xtrabackup/20-decompress into lp:percona-xtrabackup/2.0
- 20-decompress
- Merge into 2.0
Status: | Rejected |
---|---|
Rejected by: | Alexey Kopytov |
Proposed branch: | lp:~sergei.glushchenko/percona-xtrabackup/20-decompress |
Merge into: | lp:percona-xtrabackup/2.0 |
Diff against target: |
634 lines (+518/-5) 6 files modified
src/Makefile (+2/-2) src/datasink.h (+1/-0) src/decompress.c (+453/-0) src/decompress.h (+28/-0) src/xbstream.c (+25/-3) test/t/ib_stream_decompress.sh (+9/-0) |
To merge this branch: | bzr merge lp:~sergei.glushchenko/percona-xtrabackup/20-decompress |
Related bugs: | |
Related blueprints: |
Support for decompression in xbstream
(Undefined)
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Alexey Kopytov (community) | Disapprove | ||
Review via email: mp+120287@code.launchpad.net |
Commit message
Description of the change
BP https:/
Support for decompression in xbstream. New option -z been added to
automatically decompress .qp files inside the stream. decompress datasink
been implemented for qpress10 single file archives extraction.
#24660
Sergei Glushchenko (sergei.glushchenko) wrote : | # |
Stewart Smith (stewart) wrote : | # |
I have three thoughts:
- Should we add this to 2.0 at all? It is adding a feature and a 2.1 release probably isn't far off.
- I think that certainly in 2.1, this should be default behaviour, as it's almost certainly what the user wanted.
- parallel decompression?
Sergei Glushchenko (sergei.glushchenko) wrote : | # |
> I have three thoughts:
> - Should we add this to 2.0 at all? It is adding a feature and a 2.1 release
> probably isn't far off.
We have customer using 2.0, who want this feature.
> - I think that certainly in 2.1, this should be default behaviour, as it's
> almost certainly what the user wanted.
Yes, probably you're right.
> - parallel decompression?
I've experimented with parallel decompression, implementing it similar to parallel compression and it didn't gave me any performance gain. It was even slower than thread-per-file implementation. Do you have any specific proposals how to implement parallel decompression?
Stewart Smith (stewart) wrote : | # |
> > - parallel decompression?
>
> I've experimented with parallel decompression, implementing it similar to
> parallel compression and it didn't gave me any performance gain. It was even
> slower than thread-per-file implementation. Do you have any specific proposals
> how to implement parallel decompression?
Thread per file is fine.
Alexey Kopytov (akopytov) wrote : | # |
As discussed on IRC, the feature will be implemented by adding a --decompress-
Alexey Kopytov (akopytov) wrote : | # |
After further analysis and discussions it was decided that this feature does not really have real applications, while complicating the xbstream architecture significantly.
Users willing to utilize compression only to speed up transferring the entire archive to another host do not really need built-in compression in XtraBackup. That feature was implemented for different use cases. Instead, users can use normal uncompressed stream in XtraBackup (either tar, or xbstream in case of parallel or incremental backups) and then use external compression utilities (either single- or multi-threaded) to compress the stream and decompress it on remote host.
Unmerged revisions
- 462. By Sergei Glushchenko
-
BP https:/
/blueprints. launchpad. net/percona- xtrabackup/ +spec/xbstream- decompress
Support for decompression in xbstream. New option -z been added to
automatically decompress .qp files inside the stream. decompress datasink
been implemented for qpress10 one-file archives extraction.
Preview Diff
1 | === modified file 'src/Makefile' |
2 | --- src/Makefile 2012-02-10 20:05:56 +0000 |
3 | +++ src/Makefile 2012-08-19 12:20:47 +0000 |
4 | @@ -24,7 +24,7 @@ |
5 | |
6 | COMMON_INC = -I. -I libarchive/libarchive -I quicklz |
7 | XTRABACKUPOBJS = xtrabackup.o stream.o local.o compress.o xbstream_write.o \ |
8 | - quicklz/quicklz.o |
9 | + quicklz/quicklz.o decompress.o |
10 | XBSTREAMOBJS = xbstream.o xbstream_write.o xbstream_read.o |
11 | |
12 | LIBARCHIVE_A = libarchive/libarchive/libarchive.a |
13 | @@ -176,7 +176,7 @@ |
14 | xbstream.o xbstream_read.o: %.o: %.c |
15 | $(CC) $(CFLAGS) $(INC) $(DEFS) -c $< -o $@ |
16 | |
17 | -xbstream: $(XBSTREAMOBJS) $(MYSQLOBJS) local.o |
18 | +xbstream: $(XBSTREAMOBJS) $(MYSQLOBJS) local.o decompress.o quicklz/quicklz.o |
19 | $(CC) $(CFLAGS) $^ $(INC) $(MYSQLOBJS) $(LIBS) -o $@ |
20 | |
21 | xtrabackup.o: xtrabackup.c xb_regex.h |
22 | |
23 | === modified file 'src/datasink.h' |
24 | --- src/datasink.h 2012-02-10 20:05:56 +0000 |
25 | +++ src/datasink.h 2012-08-19 12:20:47 +0000 |
26 | @@ -30,6 +30,7 @@ |
27 | struct datasink_struct *datasink; |
28 | char *root; |
29 | void *ptr; |
30 | + struct ds_ctxt *pipe_ctxt; |
31 | } ds_ctxt_t; |
32 | |
33 | typedef struct { |
34 | |
35 | === added file 'src/decompress.c' |
36 | --- src/decompress.c 1970-01-01 00:00:00 +0000 |
37 | +++ src/decompress.c 2012-08-19 12:20:47 +0000 |
38 | @@ -0,0 +1,453 @@ |
39 | +/****************************************************** |
40 | +Copyright (c) 2011 Percona Inc. |
41 | + |
42 | +Compressing datasink implementation for XtraBackup. |
43 | + |
44 | +This program is free software; you can redistribute it and/or modify |
45 | +it under the terms of the GNU General Public License as published by |
46 | +the Free Software Foundation; version 2 of the License. |
47 | + |
48 | +This program is distributed in the hope that it will be useful, |
49 | +but WITHOUT ANY WARRANTY; without even the implied warranty of |
50 | +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
51 | +GNU General Public License for more details. |
52 | + |
53 | +You should have received a copy of the GNU General Public License |
54 | +along with this program; if not, write to the Free Software |
55 | +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA |
56 | + |
57 | +*******************************************************/ |
58 | + |
59 | + |
60 | +#include <my_base.h> |
61 | +#include <quicklz.h> |
62 | +#include <univ.i> |
63 | +#include <zlib.h> |
64 | +#include "common.h" |
65 | +#include "datasink.h" |
66 | + |
67 | +#define COMPRESS_CHUNK_SIZE (64 * 1024UL) |
68 | +#define MY_QLZ_COMPRESS_OVERHEAD 400 |
69 | + |
70 | +#define DS_DECOMPRESS_BUF_SIZE (COMPRESS_CHUNK_SIZE * 10UL) |
71 | + |
72 | +typedef struct { |
73 | + pthread_t id; |
74 | + pthread_mutex_t data_mutex; |
75 | + pthread_cond_t not_full_cond; |
76 | + pthread_cond_t not_empty_cond; |
77 | + char data[DS_DECOMPRESS_BUF_SIZE]; |
78 | + char* rptr; |
79 | + char* wptr; |
80 | + size_t bytes_avail; |
81 | + size_t data_size; |
82 | + my_bool eof; |
83 | + my_bool error; |
84 | + ds_ctxt_t *ds; |
85 | +} ds_decomp_ctxt_t; |
86 | + |
87 | +static |
88 | +void * |
89 | +deqpress_worker_thread_func(void *arg); |
90 | + |
91 | +static ds_ctxt_t *decomp_init(const char *root); |
92 | +static ds_file_t *decomp_open(ds_ctxt_t *ctxt, const char *path, |
93 | + MY_STAT *mystat); |
94 | +static int decomp_write(ds_file_t *file, const void *buf, size_t len); |
95 | +static int decomp_close(ds_file_t *file); |
96 | +static void decomp_deinit(ds_ctxt_t *ctxt); |
97 | + |
98 | +datasink_t datasink_decompress = { |
99 | + &decomp_init, |
100 | + &decomp_open, |
101 | + &decomp_write, |
102 | + &decomp_close, |
103 | + &decomp_deinit |
104 | +}; |
105 | + |
106 | +static |
107 | +ds_ctxt_t * |
108 | +decomp_init(const char *root) |
109 | +{ |
110 | + ds_ctxt_t *ctxt; |
111 | + |
112 | + ctxt = (ds_ctxt_t *) my_malloc(sizeof(ds_ctxt_t), |
113 | + MYF(MY_FAE)); |
114 | + |
115 | + ctxt->ptr = NULL; |
116 | + ctxt->root = my_strdup(root, MYF(MY_FAE)); |
117 | + ctxt->datasink = &datasink_decompress; |
118 | + |
119 | + return ctxt; |
120 | +} |
121 | + |
122 | +static |
123 | +void |
124 | +decomp_deinit(ds_ctxt_t *ctxt) |
125 | +{ |
126 | + MY_FREE(ctxt->root); |
127 | + MY_FREE(ctxt); |
128 | +} |
129 | + |
130 | +static |
131 | +ds_file_t * |
132 | +decomp_open(ds_ctxt_t *ctxt, const char *path, |
133 | + MY_STAT *mystat __attribute__((__unused__))) |
134 | +{ |
135 | + ds_decomp_ctxt_t *decomp_ctxt; |
136 | + ds_file_t *file; |
137 | + |
138 | + xb_a(ctxt->pipe_ctxt != NULL); |
139 | + |
140 | + file = (ds_file_t *) my_malloc(sizeof(ds_file_t) + |
141 | + sizeof(ds_decomp_ctxt_t), |
142 | + MYF(MY_FAE)); |
143 | + |
144 | + decomp_ctxt = (ds_decomp_ctxt_t *) (file + 1); |
145 | + file->ptr = decomp_ctxt; |
146 | + file->path = my_strdup(path, MYF(MY_FAE)); |
147 | + |
148 | + decomp_ctxt->rptr = |
149 | + decomp_ctxt->wptr = decomp_ctxt->data; |
150 | + decomp_ctxt->eof = decomp_ctxt->error = FALSE; |
151 | + decomp_ctxt->bytes_avail = 0; |
152 | + decomp_ctxt->data_size = DS_DECOMPRESS_BUF_SIZE; |
153 | + decomp_ctxt->ds = (ds_ctxt_t *) ctxt->pipe_ctxt; |
154 | + |
155 | + pthread_mutex_init(&decomp_ctxt->data_mutex, NULL); |
156 | + pthread_cond_init(&decomp_ctxt->not_full_cond, NULL); |
157 | + pthread_cond_init(&decomp_ctxt->not_empty_cond, NULL); |
158 | + |
159 | + if (pthread_create(&decomp_ctxt->id, NULL, |
160 | + deqpress_worker_thread_func, file)) { |
161 | + msg("compress: pthread_create() failed: " |
162 | + "errno = %d\n", errno); |
163 | + return NULL; |
164 | + } |
165 | + |
166 | + return file; |
167 | +} |
168 | + |
169 | +static |
170 | +int |
171 | +decomp_write(ds_file_t *file, const void *buf, size_t len) |
172 | +{ |
173 | + ds_decomp_ctxt_t *decomp_ctxt; |
174 | + const char *ptr = (const char*) buf; |
175 | + |
176 | + decomp_ctxt = (ds_decomp_ctxt_t *) file->ptr; |
177 | + |
178 | + while (len > 0) { |
179 | + |
180 | + size_t n; |
181 | + |
182 | + pthread_mutex_lock(&decomp_ctxt->data_mutex); |
183 | + |
184 | + while ((decomp_ctxt->bytes_avail |
185 | + == decomp_ctxt->data_size) && |
186 | + !decomp_ctxt->error) { |
187 | + pthread_cond_wait( |
188 | + &decomp_ctxt->not_full_cond, |
189 | + &decomp_ctxt->data_mutex); |
190 | + } |
191 | + |
192 | + if (decomp_ctxt->error) { |
193 | + pthread_mutex_unlock(&decomp_ctxt->data_mutex); |
194 | + return 1; |
195 | + } |
196 | + |
197 | + n = len < (decomp_ctxt->data_size - decomp_ctxt->bytes_avail) ? |
198 | + len : (decomp_ctxt->data_size - decomp_ctxt->bytes_avail); |
199 | + |
200 | + if (decomp_ctxt->wptr + n > |
201 | + decomp_ctxt->data + decomp_ctxt->data_size) { |
202 | + n = decomp_ctxt->data + decomp_ctxt->data_size - |
203 | + decomp_ctxt->wptr; |
204 | + } |
205 | + |
206 | + memcpy(decomp_ctxt->wptr, ptr, n); |
207 | + |
208 | + len -= n; |
209 | + ptr += n; |
210 | + decomp_ctxt->wptr += n; |
211 | + decomp_ctxt->bytes_avail += n; |
212 | + |
213 | + if (decomp_ctxt->wptr == |
214 | + decomp_ctxt->data + decomp_ctxt->data_size) { |
215 | + decomp_ctxt->wptr = decomp_ctxt->data; |
216 | + } |
217 | + |
218 | + if (decomp_ctxt->bytes_avail > decomp_ctxt->data_size / 5) |
219 | + pthread_cond_signal(&decomp_ctxt->not_empty_cond); |
220 | + |
221 | + pthread_mutex_unlock(&decomp_ctxt->data_mutex); |
222 | + |
223 | + } |
224 | + |
225 | + return 0; |
226 | +} |
227 | + |
228 | +static |
229 | +my_bool |
230 | +decomp_read(ds_file_t *file, const void *buf, size_t len) |
231 | +{ |
232 | + ds_decomp_ctxt_t *decomp_ctxt; |
233 | + char *ptr = (char*) buf; |
234 | + |
235 | + decomp_ctxt = (ds_decomp_ctxt_t *) file->ptr; |
236 | + |
237 | + while (len > 0) { |
238 | + |
239 | + size_t n; |
240 | + |
241 | + pthread_mutex_lock(&decomp_ctxt->data_mutex); |
242 | + |
243 | + while (decomp_ctxt->bytes_avail == 0 && |
244 | + !decomp_ctxt->eof) { |
245 | + pthread_cond_wait( |
246 | + &decomp_ctxt->not_empty_cond, |
247 | + &decomp_ctxt->data_mutex); |
248 | + } |
249 | + |
250 | + if (decomp_ctxt->bytes_avail == 0 && |
251 | + decomp_ctxt->eof) { |
252 | + pthread_mutex_unlock(&decomp_ctxt->data_mutex); |
253 | + return FALSE; |
254 | + } |
255 | + |
256 | + n = len < decomp_ctxt->bytes_avail ? |
257 | + len : decomp_ctxt->bytes_avail; |
258 | + |
259 | + |
260 | + if (decomp_ctxt->rptr + n > |
261 | + decomp_ctxt->data + decomp_ctxt->data_size) { |
262 | + n = decomp_ctxt->data + decomp_ctxt->data_size - |
263 | + decomp_ctxt->rptr; |
264 | + } |
265 | + |
266 | + if (ptr) { |
267 | + memcpy(ptr, decomp_ctxt->rptr, n); |
268 | + ptr += n; |
269 | + } |
270 | + |
271 | + len -= n; |
272 | + decomp_ctxt->rptr += n; |
273 | + decomp_ctxt->bytes_avail -= n; |
274 | + |
275 | + if (decomp_ctxt->rptr == |
276 | + decomp_ctxt->data + decomp_ctxt->data_size) { |
277 | + decomp_ctxt->rptr = decomp_ctxt->data; |
278 | + } |
279 | + |
280 | + pthread_cond_signal(&decomp_ctxt->not_full_cond); |
281 | + |
282 | + pthread_mutex_unlock(&decomp_ctxt->data_mutex); |
283 | + |
284 | + } |
285 | + |
286 | + return TRUE; |
287 | +} |
288 | + |
289 | +static |
290 | +my_bool |
291 | +decomp_is_eof(ds_file_t *file) |
292 | +{ |
293 | + return ((ds_decomp_ctxt_t *) file->ptr)->eof; |
294 | +} |
295 | + |
296 | +static |
297 | +int |
298 | +decomp_close(ds_file_t *file) |
299 | +{ |
300 | + ds_decomp_ctxt_t *decomp_ctxt; |
301 | + |
302 | + decomp_ctxt = (ds_decomp_ctxt_t *) file->ptr; |
303 | + |
304 | + pthread_mutex_lock(&decomp_ctxt->data_mutex); |
305 | + |
306 | + decomp_ctxt->eof = TRUE; |
307 | + |
308 | + pthread_cond_broadcast(&decomp_ctxt->not_empty_cond); |
309 | + |
310 | + pthread_mutex_unlock(&decomp_ctxt->data_mutex); |
311 | + |
312 | + pthread_join(decomp_ctxt->id, NULL); |
313 | + |
314 | + pthread_mutex_destroy(&decomp_ctxt->data_mutex); |
315 | + pthread_cond_destroy(&decomp_ctxt->not_full_cond); |
316 | + pthread_cond_destroy(&decomp_ctxt->not_empty_cond); |
317 | + |
318 | + MY_FREE(file->path); |
319 | + MY_FREE(file); |
320 | + |
321 | + return 0; |
322 | +} |
323 | + |
324 | +static |
325 | +void |
326 | +decomp_set_error(ds_file_t *file) |
327 | +{ |
328 | + ds_decomp_ctxt_t *decomp_ctxt; |
329 | + |
330 | + decomp_ctxt = (ds_decomp_ctxt_t *) file->ptr; |
331 | + |
332 | + pthread_mutex_lock(&decomp_ctxt->data_mutex); |
333 | + |
334 | + decomp_ctxt->error = TRUE; |
335 | + |
336 | + pthread_cond_broadcast(&decomp_ctxt->not_full_cond); |
337 | + |
338 | + pthread_mutex_unlock(&decomp_ctxt->data_mutex); |
339 | +} |
340 | + |
341 | +#define DECOMP_READ(file, buf, len) \ |
342 | + if (!decomp_read(file, buf, len)) { \ |
343 | + msg("decompress: error read %s\n", file->path); \ |
344 | + goto error; \ |
345 | + } |
346 | + |
347 | + |
348 | +static |
349 | +void * |
350 | +deqpress_worker_thread_func(void *arg) |
351 | +{ |
352 | + ds_file_t *file = (ds_file_t *) arg; |
353 | + ds_ctxt_t *ds = ((ds_decomp_ctxt_t *) file->ptr)->ds; |
354 | + char *comp_chunk = NULL, *decomp_chunk = NULL; |
355 | + char tmp[10]; |
356 | + ds_file_t *arcfile = NULL; |
357 | + ulong filename_len; |
358 | + char filename[FN_REFLEN]; |
359 | + char dirname[FN_REFLEN]; |
360 | + size_t dirpath_len; |
361 | + |
362 | + my_thread_init(); |
363 | + |
364 | + DECOMP_READ(file, tmp, 8); |
365 | + |
366 | + if (strncmp(tmp, "qpress10", 8) != 0) { |
367 | + msg("decompress: source was not compressed with qpress %s\n", |
368 | + file->path); |
369 | + goto error; |
370 | + } |
371 | + |
372 | + DECOMP_READ(file, tmp, 8); |
373 | + |
374 | + ulonglong compress_chunk_size = uint8korr(tmp); |
375 | + if (compress_chunk_size > 512*1024*1024) { |
376 | + msg("decompress: source is corrupted: %s\n", file->path); |
377 | + goto error; |
378 | + } |
379 | + |
380 | + DECOMP_READ(file, tmp, 1); |
381 | + if (tmp[0] != 'F') { |
382 | + msg("decompress: %s: xbtream can decompress only single-file " |
383 | + "qpress archives for now\n", file->path); |
384 | + goto error; |
385 | + } |
386 | + |
387 | + DECOMP_READ(file, tmp, 4); |
388 | + |
389 | + filename_len = uint4korr(tmp); |
390 | + |
391 | + dirname_part(dirname, file->path, &dirpath_len); |
392 | + char *filename_short = my_malloc(filename_len + 1, MYF(MY_FAE)); |
393 | + DECOMP_READ(file, filename_short, filename_len + 1); |
394 | + snprintf(filename, FN_REFLEN, |
395 | + "%s/%s/%s", ds->root, dirname, filename_short); |
396 | + MY_FREE(filename_short); |
397 | + |
398 | + MY_STAT mystat; |
399 | + arcfile = ds->datasink->open(ds, filename, &mystat); |
400 | + if (arcfile == NULL) { |
401 | + msg("decompress: failed to create file %s.\n", filename); |
402 | + goto error; |
403 | + } |
404 | + |
405 | + comp_chunk = my_malloc((compress_chunk_size + |
406 | + MY_QLZ_COMPRESS_OVERHEAD) * 2, MYF(MY_FAE)); |
407 | + decomp_chunk = comp_chunk + compress_chunk_size + MY_QLZ_COMPRESS_OVERHEAD; |
408 | + qlz_state_decompress state; |
409 | + |
410 | + while (1) { |
411 | + DECOMP_READ(file, tmp, 8); |
412 | + |
413 | + if (strncmp(tmp, "NEWBNEWB", 8) == 0) { |
414 | + DECOMP_READ(file, tmp, 8); |
415 | + DECOMP_READ(file, tmp, 4); |
416 | + |
417 | + ulong adler = uint4korr(tmp); |
418 | + |
419 | + DECOMP_READ(file, comp_chunk, 9); |
420 | + |
421 | + size_t size_compressed = |
422 | + qlz_size_compressed(comp_chunk); |
423 | + if (size_compressed > compress_chunk_size + |
424 | + MY_QLZ_COMPRESS_OVERHEAD) { |
425 | + msg("decompress: chunk is too large %s\n", |
426 | + file->path); |
427 | + goto error; |
428 | + } |
429 | + |
430 | + DECOMP_READ(file, comp_chunk + 9, size_compressed - 9); |
431 | + |
432 | + ulint adler_read = adler32(0x00000001, comp_chunk, |
433 | + size_compressed); |
434 | + size_t size_decompressed = qlz_decompress(comp_chunk, |
435 | + decomp_chunk, &state); |
436 | + |
437 | + if (size_decompressed > COMPRESS_CHUNK_SIZE |
438 | + + MY_QLZ_COMPRESS_OVERHEAD) { |
439 | + msg("decompress: buffer overflow\n"); |
440 | + goto error; |
441 | + } |
442 | + |
443 | + if (adler != adler_read) { |
444 | + msg("decompress: corrupted %s, " |
445 | + "checksum mismatch\n", |
446 | + file->path); |
447 | + goto error; |
448 | + } |
449 | + |
450 | + ds->datasink->write(arcfile, decomp_chunk, |
451 | + size_decompressed); |
452 | + |
453 | + continue; |
454 | + } |
455 | + |
456 | + if (strncmp(tmp, "ENDSENDS", 8) == 0) { |
457 | + DECOMP_READ(file, tmp, 8); |
458 | + break; |
459 | + } |
460 | + |
461 | + msg("decompress: corrupted %s\n", file->path); |
462 | + goto error; |
463 | + } |
464 | + |
465 | + if (decomp_read(file, tmp, 1)) { |
466 | + msg("decompress: error, EOF expected\n"); |
467 | + goto error; |
468 | + } |
469 | + |
470 | + if (!decomp_is_eof(file)) { |
471 | + msg("decompress: error, EOF expected\n"); |
472 | + goto error; |
473 | + } |
474 | + |
475 | + if (arcfile) { |
476 | + ds->datasink->close(arcfile); |
477 | + } |
478 | + MY_FREE(comp_chunk); |
479 | + my_thread_end(); |
480 | + return NULL; |
481 | + |
482 | +error: |
483 | + if (arcfile) { |
484 | + ds->datasink->close(arcfile); |
485 | + } |
486 | + decomp_set_error(file); |
487 | + MY_FREE(comp_chunk); |
488 | + my_thread_end(); |
489 | + return NULL; |
490 | + |
491 | +} |
492 | |
493 | === added file 'src/decompress.h' |
494 | --- src/decompress.h 1970-01-01 00:00:00 +0000 |
495 | +++ src/decompress.h 2012-08-19 12:20:47 +0000 |
496 | @@ -0,0 +1,28 @@ |
497 | +/****************************************************** |
498 | +Copyright (c) 2011 Percona Inc. |
499 | + |
500 | +Compression interface for XtraBackup. |
501 | + |
502 | +This program is free software; you can redistribute it and/or modify |
503 | +it under the terms of the GNU General Public License as published by |
504 | +the Free Software Foundation; version 2 of the License. |
505 | + |
506 | +This program is distributed in the hope that it will be useful, |
507 | +but WITHOUT ANY WARRANTY; without even the implied warranty of |
508 | +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
509 | +GNU General Public License for more details. |
510 | + |
511 | +You should have received a copy of the GNU General Public License |
512 | +along with this program; if not, write to the Free Software |
513 | +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA |
514 | + |
515 | +*******************************************************/ |
516 | + |
517 | +#ifndef DS_DEDSCOMPRESS_H |
518 | +#define DS_DEDSCOMPRESS_H |
519 | + |
520 | +#include "datasink.h" |
521 | + |
522 | +extern datasink_t datasink_decompress; |
523 | + |
524 | +#endif |
525 | |
526 | === modified file 'src/xbstream.c' |
527 | --- src/xbstream.c 2012-02-10 20:05:56 +0000 |
528 | +++ src/xbstream.c 2012-08-19 12:20:47 +0000 |
529 | @@ -24,6 +24,7 @@ |
530 | #include "common.h" |
531 | #include "xbstream.h" |
532 | #include "local.h" |
533 | +#include "decompress.h" |
534 | |
535 | #define XBSTREAM_VERSION "1.0" |
536 | #define XBSTREAM_BUFFER_SIZE (1024 * 1024UL) |
537 | @@ -39,6 +40,7 @@ |
538 | static run_mode_t opt_mode; |
539 | static char * opt_directory = NULL; |
540 | static my_bool opt_verbose = 0; |
541 | +static my_bool opt_decompress = 0; |
542 | |
543 | static struct my_option my_long_options[] = |
544 | { |
545 | @@ -54,6 +56,9 @@ |
546 | GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, |
547 | {"verbose", 'v', "Print verbose output.", &opt_verbose, &opt_verbose, |
548 | 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, |
549 | + {"decompress", 'z', "Decompress compressed files.", |
550 | + &opt_decompress, &opt_decompress, |
551 | + 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, |
552 | |
553 | {0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0} |
554 | }; |
555 | @@ -362,7 +367,9 @@ |
556 | HASH filehash; |
557 | file_entry_t *entry; |
558 | datasink_t *ds; |
559 | + datasink_t *ds_decomp; |
560 | ds_ctxt_t *ds_ctxt; |
561 | + ds_ctxt_t *ds_ctxt_decomp; |
562 | |
563 | stream = xb_stream_read_new(); |
564 | if (stream == NULL) { |
565 | @@ -372,7 +379,10 @@ |
566 | |
567 | /* If --directory is specified, it is already set as CWD by now. */ |
568 | ds = &datasink_local; |
569 | + ds_decomp = &datasink_decompress; |
570 | ds_ctxt = ds->init("."); |
571 | + ds_ctxt_decomp = ds_decomp->init("."); |
572 | + ds_ctxt_decomp->pipe_ctxt = ds_ctxt; |
573 | |
574 | if (my_hash_init(&filehash, &my_charset_bin, START_FILE_HASH_SIZE, |
575 | 0, 0, (my_hash_get_key) get_file_entry_key, |
576 | @@ -395,8 +405,18 @@ |
577 | chunk.pathlen); |
578 | |
579 | if (entry == NULL) { |
580 | - entry = file_entry_new(ds_ctxt, chunk.path, |
581 | - chunk.pathlen); |
582 | + if (opt_decompress && |
583 | + strlen(chunk.path) > 3 && |
584 | + strcmp(chunk.path + strlen(chunk.path) - 3, |
585 | + ".qp") == 0 && |
586 | + memcmp(chunk.data, "qpress10", 8) == 0) { |
587 | + entry = file_entry_new(ds_ctxt_decomp, chunk.path, |
588 | + chunk.pathlen); |
589 | + } else { |
590 | + entry = file_entry_new(ds_ctxt, chunk.path, |
591 | + chunk.pathlen); |
592 | + |
593 | + } |
594 | if (entry == NULL) { |
595 | goto err; |
596 | } |
597 | @@ -420,7 +440,7 @@ |
598 | goto err; |
599 | } |
600 | |
601 | - if (ds->write(entry->file, chunk.data, chunk.length)) { |
602 | + if (entry->ds_ctxt->datasink->write(entry->file, chunk.data, chunk.length)) { |
603 | msg("%s: my_write() failed.\n", my_progname); |
604 | goto err; |
605 | } |
606 | @@ -434,12 +454,14 @@ |
607 | |
608 | my_hash_free(&filehash); |
609 | ds->deinit(ds_ctxt); |
610 | + ds_decomp->deinit(ds_ctxt_decomp); |
611 | xb_stream_read_done(stream); |
612 | |
613 | return 0; |
614 | err: |
615 | my_hash_free(&filehash); |
616 | ds->deinit(ds_ctxt); |
617 | + ds_decomp->deinit(ds_ctxt_decomp); |
618 | xb_stream_read_done(stream); |
619 | |
620 | return 1; |
621 | |
622 | === added file 'test/t/ib_stream_decompress.sh' |
623 | --- test/t/ib_stream_decompress.sh 1970-01-01 00:00:00 +0000 |
624 | +++ test/t/ib_stream_decompress.sh 2012-08-19 12:20:47 +0000 |
625 | @@ -0,0 +1,9 @@ |
626 | +############################################################################ |
627 | +# Test streaming + compression (decompression with xbstream) |
628 | +############################################################################ |
629 | + |
630 | +stream_format=xbstream |
631 | +stream_extract_cmd="xbstream -xvz <" |
632 | +innobackupex_options="--compress --compress-threads=4 --parallel=4" |
633 | + |
634 | +. inc/ib_stream_common.sh |
http:// jenkins. percona. com/view/ XtraBackup/ job/percona- xtrabackup- 2.0-param/ 243/