Merge lp:~jameinel/bzr/1.15-pack-source into lp:~bzr/bzr/trunk-old
- 1.15-pack-source
- Merge into trunk-old
Status: | Merged |
---|---|
Merged at revision: | not available |
Proposed branch: | lp:~jameinel/bzr/1.15-pack-source |
Merge into: | lp:~bzr/bzr/trunk-old |
Diff against target: | 824 lines |
To merge this branch: | bzr merge lp:~jameinel/bzr/1.15-pack-source |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Martin Pool | 2009-06-02 | Approve on 2009-06-16 | |
Review via email:
|
Commit message
Description of the change
John A Meinel (jameinel) wrote : | # |
Martin Pool (mbp) wrote : | # |
This looks ok to me, though you might want to run the concept past Robert.
Robert Collins (lifeless) wrote : | # |
On Tue, 2009-06-16 at 05:33 +0000, Martin Pool wrote:
> Review: Approve
> This looks ok to me, though you might want to run the concept past Robert.
Conceptually fine. Using Packer was a hack when we had no interface able
to be efficient back in the days of single VersionedFile and Knits.
-Rob
- 4374. By John A Meinel on 2009-06-17
-
Merge bzr.dev 4454 in preparation for NEWS entry.
- 4375. By John A Meinel on 2009-06-17
-
NEWS entry about PackStreamSource
- 4376. By John A Meinel on 2009-06-17
-
It seems that fetch() no longer returns the number of revisions fetched.
It still does for *some* InterRepository fetch paths, but the generic one does not.
It is also not easy to get it to, since the Source and Sink are the ones
that would know how many keys were transmitted, and they are potentially 'remote'
objects.This was also only tested to occur as a by-product in a random 'test_commit' test.
I assume if we really wanted the assurance, we would have a per_repo or interrepo
test for it. - 4377. By John A Meinel on 2009-06-18
-
Change insert_
from_broken_ repo into an expectedFailure.
This has to do with bug #389141.
Preview Diff
1 | === modified file 'bzrlib/fetch.py' |
2 | --- bzrlib/fetch.py 2009-06-10 03:56:49 +0000 |
3 | +++ bzrlib/fetch.py 2009-06-16 02:36:36 +0000 |
4 | @@ -51,9 +51,6 @@ |
5 | :param last_revision: If set, try to limit to the data this revision |
6 | references. |
7 | :param find_ghosts: If True search the entire history for ghosts. |
8 | - :param _write_group_acquired_callable: Don't use; this parameter only |
9 | - exists to facilitate a hack done in InterPackRepo.fetch. We would |
10 | - like to remove this parameter. |
11 | :param pb: ProgressBar object to use; deprecated and ignored. |
12 | This method will just create one on top of the stack. |
13 | """ |
14 | |
15 | === modified file 'bzrlib/repofmt/groupcompress_repo.py' |
16 | --- bzrlib/repofmt/groupcompress_repo.py 2009-06-12 01:11:00 +0000 |
17 | +++ bzrlib/repofmt/groupcompress_repo.py 2009-06-16 02:36:36 +0000 |
18 | @@ -48,6 +48,7 @@ |
19 | Pack, |
20 | NewPack, |
21 | KnitPackRepository, |
22 | + KnitPackStreamSource, |
23 | PackRootCommitBuilder, |
24 | RepositoryPackCollection, |
25 | RepositoryFormatPack, |
26 | @@ -736,21 +737,10 @@ |
27 | # make it raise to trap naughty direct users. |
28 | raise NotImplementedError(self._iter_inventory_xmls) |
29 | |
30 | - def _find_parent_ids_of_revisions(self, revision_ids): |
31 | - # TODO: we probably want to make this a helper that other code can get |
32 | - # at |
33 | - parent_map = self.get_parent_map(revision_ids) |
34 | - parents = set() |
35 | - map(parents.update, parent_map.itervalues()) |
36 | - parents.difference_update(revision_ids) |
37 | - parents.discard(_mod_revision.NULL_REVISION) |
38 | - return parents |
39 | - |
40 | - def _find_present_inventory_ids(self, revision_ids): |
41 | - keys = [(r,) for r in revision_ids] |
42 | - parent_map = self.inventories.get_parent_map(keys) |
43 | - present_inventory_ids = set(k[-1] for k in parent_map) |
44 | - return present_inventory_ids |
45 | + def _find_present_inventory_keys(self, revision_keys): |
46 | + parent_map = self.inventories.get_parent_map(revision_keys) |
47 | + present_inventory_keys = set(k for k in parent_map) |
48 | + return present_inventory_keys |
49 | |
50 | def fileids_altered_by_revision_ids(self, revision_ids, _inv_weave=None): |
51 | """Find the file ids and versions affected by revisions. |
52 | @@ -767,12 +757,20 @@ |
53 | file_id_revisions = {} |
54 | pb = ui.ui_factory.nested_progress_bar() |
55 | try: |
56 | - parent_ids = self._find_parent_ids_of_revisions(revision_ids) |
57 | - present_parent_inv_ids = self._find_present_inventory_ids(parent_ids) |
58 | + revision_keys = [(r,) for r in revision_ids] |
59 | + parent_keys = self._find_parent_keys_of_revisions(revision_keys) |
60 | + # TODO: instead of using _find_present_inventory_keys, change the |
61 | + # code paths to allow missing inventories to be tolerated. |
62 | + # However, we only want to tolerate missing parent |
63 | + # inventories, not missing inventories for revision_ids |
64 | + present_parent_inv_keys = self._find_present_inventory_keys( |
65 | + parent_keys) |
66 | + present_parent_inv_ids = set( |
67 | + [k[-1] for k in present_parent_inv_keys]) |
68 | uninteresting_root_keys = set() |
69 | interesting_root_keys = set() |
70 | - inventories_to_read = set(present_parent_inv_ids) |
71 | - inventories_to_read.update(revision_ids) |
72 | + inventories_to_read = set(revision_ids) |
73 | + inventories_to_read.update(present_parent_inv_ids) |
74 | for inv in self.iter_inventories(inventories_to_read): |
75 | entry_chk_root_key = inv.id_to_entry.key() |
76 | if inv.revision_id in present_parent_inv_ids: |
77 | @@ -846,7 +844,7 @@ |
78 | return super(CHKInventoryRepository, self)._get_source(to_format) |
79 | |
80 | |
81 | -class GroupCHKStreamSource(repository.StreamSource): |
82 | +class GroupCHKStreamSource(KnitPackStreamSource): |
83 | """Used when both the source and target repo are GroupCHK repos.""" |
84 | |
85 | def __init__(self, from_repository, to_format): |
86 | @@ -854,6 +852,7 @@ |
87 | super(GroupCHKStreamSource, self).__init__(from_repository, to_format) |
88 | self._revision_keys = None |
89 | self._text_keys = None |
90 | + self._text_fetch_order = 'groupcompress' |
91 | self._chk_id_roots = None |
92 | self._chk_p_id_roots = None |
93 | |
94 | @@ -898,16 +897,10 @@ |
95 | p_id_roots_set.clear() |
96 | return ('inventories', _filtered_inv_stream()) |
97 | |
98 | - def _find_present_inventories(self, revision_ids): |
99 | - revision_keys = [(r,) for r in revision_ids] |
100 | - inventories = self.from_repository.inventories |
101 | - present_inventories = inventories.get_parent_map(revision_keys) |
102 | - return [p[-1] for p in present_inventories] |
103 | - |
104 | - def _get_filtered_chk_streams(self, excluded_revision_ids): |
105 | + def _get_filtered_chk_streams(self, excluded_revision_keys): |
106 | self._text_keys = set() |
107 | - excluded_revision_ids.discard(_mod_revision.NULL_REVISION) |
108 | - if not excluded_revision_ids: |
109 | + excluded_revision_keys.discard(_mod_revision.NULL_REVISION) |
110 | + if not excluded_revision_keys: |
111 | uninteresting_root_keys = set() |
112 | uninteresting_pid_root_keys = set() |
113 | else: |
114 | @@ -915,9 +908,9 @@ |
115 | # actually present |
116 | # TODO: Update Repository.iter_inventories() to add |
117 | # ignore_missing=True |
118 | - present_ids = self.from_repository._find_present_inventory_ids( |
119 | - excluded_revision_ids) |
120 | - present_ids = self._find_present_inventories(excluded_revision_ids) |
121 | + present_keys = self.from_repository._find_present_inventory_keys( |
122 | + excluded_revision_keys) |
123 | + present_ids = [k[-1] for k in present_keys] |
124 | uninteresting_root_keys = set() |
125 | uninteresting_pid_root_keys = set() |
126 | for inv in self.from_repository.iter_inventories(present_ids): |
127 | @@ -948,14 +941,6 @@ |
128 | self._chk_p_id_roots = None |
129 | yield 'chk_bytes', _get_parent_id_basename_to_file_id_pages() |
130 | |
131 | - def _get_text_stream(self): |
132 | - # Note: We know we don't have to handle adding root keys, because both |
133 | - # the source and target are GCCHK, and those always support rich-roots |
134 | - # We may want to request as 'unordered', in case the source has done a |
135 | - # 'split' packing |
136 | - return ('texts', self.from_repository.texts.get_record_stream( |
137 | - self._text_keys, 'groupcompress', False)) |
138 | - |
139 | def get_stream(self, search): |
140 | revision_ids = search.get_keys() |
141 | for stream_info in self._fetch_revision_texts(revision_ids): |
142 | @@ -966,8 +951,9 @@ |
143 | # For now, exclude all parents that are at the edge of ancestry, for |
144 | # which we have inventories |
145 | from_repo = self.from_repository |
146 | - parent_ids = from_repo._find_parent_ids_of_revisions(revision_ids) |
147 | - for stream_info in self._get_filtered_chk_streams(parent_ids): |
148 | + parent_keys = from_repo._find_parent_keys_of_revisions( |
149 | + self._revision_keys) |
150 | + for stream_info in self._get_filtered_chk_streams(parent_keys): |
151 | yield stream_info |
152 | yield self._get_text_stream() |
153 | |
154 | @@ -991,8 +977,8 @@ |
155 | # no unavailable texts when the ghost inventories are not filled in. |
156 | yield self._get_inventory_stream(missing_inventory_keys, |
157 | allow_absent=True) |
158 | - # We use the empty set for excluded_revision_ids, to make it clear that |
159 | - # we want to transmit all referenced chk pages. |
160 | + # We use the empty set for excluded_revision_keys, to make it clear |
161 | + # that we want to transmit all referenced chk pages. |
162 | for stream_info in self._get_filtered_chk_streams(set()): |
163 | yield stream_info |
164 | |
165 | |
166 | === modified file 'bzrlib/repofmt/pack_repo.py' |
167 | --- bzrlib/repofmt/pack_repo.py 2009-06-10 03:56:49 +0000 |
168 | +++ bzrlib/repofmt/pack_repo.py 2009-06-16 02:36:36 +0000 |
169 | @@ -73,6 +73,7 @@ |
170 | MetaDirRepositoryFormat, |
171 | RepositoryFormat, |
172 | RootCommitBuilder, |
173 | + StreamSource, |
174 | ) |
175 | import bzrlib.revision as _mod_revision |
176 | from bzrlib.trace import ( |
177 | @@ -2265,6 +2266,11 @@ |
178 | pb.finished() |
179 | return result |
180 | |
181 | + def _get_source(self, to_format): |
182 | + if to_format.network_name() == self._format.network_name(): |
183 | + return KnitPackStreamSource(self, to_format) |
184 | + return super(KnitPackRepository, self)._get_source(to_format) |
185 | + |
186 | def _make_parents_provider(self): |
187 | return graph.CachingParentsProvider(self) |
188 | |
189 | @@ -2384,6 +2390,79 @@ |
190 | repo.unlock() |
191 | |
192 | |
193 | +class KnitPackStreamSource(StreamSource): |
194 | + """A StreamSource used to transfer data between same-format KnitPack repos. |
195 | + |
196 | + This source assumes: |
197 | + 1) Same serialization format for all objects |
198 | + 2) Same root information |
199 | + 3) XML format inventories |
200 | + 4) Atomic inserts (so we can stream inventory texts before text |
201 | + content) |
202 | + 5) No chk_bytes |
203 | + """ |
204 | + |
205 | + def __init__(self, from_repository, to_format): |
206 | + super(KnitPackStreamSource, self).__init__(from_repository, to_format) |
207 | + self._text_keys = None |
208 | + self._text_fetch_order = 'unordered' |
209 | + |
210 | + def _get_filtered_inv_stream(self, revision_ids): |
211 | + from_repo = self.from_repository |
212 | + parent_ids = from_repo._find_parent_ids_of_revisions(revision_ids) |
213 | + parent_keys = [(p,) for p in parent_ids] |
214 | + find_text_keys = from_repo._find_text_key_references_from_xml_inventory_lines |
215 | + parent_text_keys = set(find_text_keys( |
216 | + from_repo._inventory_xml_lines_for_keys(parent_keys))) |
217 | + content_text_keys = set() |
218 | + knit = KnitVersionedFiles(None, None) |
219 | + factory = KnitPlainFactory() |
220 | + def find_text_keys_from_content(record): |
221 | + if record.storage_kind not in ('knit-delta-gz', 'knit-ft-gz'): |
222 | + raise ValueError("Unknown content storage kind for" |
223 | + " inventory text: %s" % (record.storage_kind,)) |
224 | + # It's a knit record, it has a _raw_record field (even if it was |
225 | + # reconstituted from a network stream). |
226 | + raw_data = record._raw_record |
227 | + # read the entire thing |
228 | + revision_id = record.key[-1] |
229 | + content, _ = knit._parse_record(revision_id, raw_data) |
230 | + if record.storage_kind == 'knit-delta-gz': |
231 | + line_iterator = factory.get_linedelta_content(content) |
232 | + elif record.storage_kind == 'knit-ft-gz': |
233 | + line_iterator = factory.get_fulltext_content(content) |
234 | + content_text_keys.update(find_text_keys( |
235 | + [(line, revision_id) for line in line_iterator])) |
236 | + revision_keys = [(r,) for r in revision_ids] |
237 | + def _filtered_inv_stream(): |
238 | + source_vf = from_repo.inventories |
239 | + stream = source_vf.get_record_stream(revision_keys, |
240 | + 'unordered', False) |
241 | + for record in stream: |
242 | + if record.storage_kind == 'absent': |
243 | + raise errors.NoSuchRevision(from_repo, record.key) |
244 | + find_text_keys_from_content(record) |
245 | + yield record |
246 | + self._text_keys = content_text_keys - parent_text_keys |
247 | + return ('inventories', _filtered_inv_stream()) |
248 | + |
249 | + def _get_text_stream(self): |
250 | + # Note: We know we don't have to handle adding root keys, because both |
251 | + # the source and target are the identical network name. |
252 | + text_stream = self.from_repository.texts.get_record_stream( |
253 | + self._text_keys, self._text_fetch_order, False) |
254 | + return ('texts', text_stream) |
255 | + |
256 | + def get_stream(self, search): |
257 | + revision_ids = search.get_keys() |
258 | + for stream_info in self._fetch_revision_texts(revision_ids): |
259 | + yield stream_info |
260 | + self._revision_keys = [(rev_id,) for rev_id in revision_ids] |
261 | + yield self._get_filtered_inv_stream(revision_ids) |
262 | + yield self._get_text_stream() |
263 | + |
264 | + |
265 | + |
266 | class RepositoryFormatPack(MetaDirRepositoryFormat): |
267 | """Format logic for pack structured repositories. |
268 | |
269 | |
270 | === modified file 'bzrlib/repository.py' |
271 | --- bzrlib/repository.py 2009-06-12 01:11:00 +0000 |
272 | +++ bzrlib/repository.py 2009-06-16 02:36:36 +0000 |
273 | @@ -1919,29 +1919,25 @@ |
274 | yield line, revid |
275 | |
276 | def _find_file_ids_from_xml_inventory_lines(self, line_iterator, |
277 | - revision_ids): |
278 | + revision_keys): |
279 | """Helper routine for fileids_altered_by_revision_ids. |
280 | |
281 | This performs the translation of xml lines to revision ids. |
282 | |
283 | :param line_iterator: An iterator of lines, origin_version_id |
284 | - :param revision_ids: The revision ids to filter for. This should be a |
285 | + :param revision_keys: The revision ids to filter for. This should be a |
286 | set or other type which supports efficient __contains__ lookups, as |
287 | - the revision id from each parsed line will be looked up in the |
288 | - revision_ids filter. |
289 | + the revision key from each parsed line will be looked up in the |
290 | + revision_keys filter. |
291 | :return: a dictionary mapping altered file-ids to an iterable of |
292 | revision_ids. Each altered file-ids has the exact revision_ids that |
293 | altered it listed explicitly. |
294 | """ |
295 | seen = set(self._find_text_key_references_from_xml_inventory_lines( |
296 | line_iterator).iterkeys()) |
297 | - # Note that revision_ids are revision keys. |
298 | - parent_maps = self.revisions.get_parent_map(revision_ids) |
299 | - parents = set() |
300 | - map(parents.update, parent_maps.itervalues()) |
301 | - parents.difference_update(revision_ids) |
302 | + parent_keys = self._find_parent_keys_of_revisions(revision_keys) |
303 | parent_seen = set(self._find_text_key_references_from_xml_inventory_lines( |
304 | - self._inventory_xml_lines_for_keys(parents))) |
305 | + self._inventory_xml_lines_for_keys(parent_keys))) |
306 | new_keys = seen - parent_seen |
307 | result = {} |
308 | setdefault = result.setdefault |
309 | @@ -1949,6 +1945,33 @@ |
310 | setdefault(key[0], set()).add(key[-1]) |
311 | return result |
312 | |
313 | + def _find_parent_ids_of_revisions(self, revision_ids): |
314 | + """Find all parent ids that are mentioned in the revision graph. |
315 | + |
316 | + :return: set of revisions that are parents of revision_ids which are |
317 | + not part of revision_ids themselves |
318 | + """ |
319 | + parent_map = self.get_parent_map(revision_ids) |
320 | + parent_ids = set() |
321 | + map(parent_ids.update, parent_map.itervalues()) |
322 | + parent_ids.difference_update(revision_ids) |
323 | + parent_ids.discard(_mod_revision.NULL_REVISION) |
324 | + return parent_ids |
325 | + |
326 | + def _find_parent_keys_of_revisions(self, revision_keys): |
327 | + """Similar to _find_parent_ids_of_revisions, but used with keys. |
328 | + |
329 | + :param revision_keys: An iterable of revision_keys. |
330 | + :return: The parents of all revision_keys that are not already in |
331 | + revision_keys |
332 | + """ |
333 | + parent_map = self.revisions.get_parent_map(revision_keys) |
334 | + parent_keys = set() |
335 | + map(parent_keys.update, parent_map.itervalues()) |
336 | + parent_keys.difference_update(revision_keys) |
337 | + parent_keys.discard(_mod_revision.NULL_REVISION) |
338 | + return parent_keys |
339 | + |
340 | def fileids_altered_by_revision_ids(self, revision_ids, _inv_weave=None): |
341 | """Find the file ids and versions affected by revisions. |
342 | |
343 | @@ -3418,144 +3441,6 @@ |
344 | return self.source.revision_ids_to_search_result(result_set) |
345 | |
346 | |
347 | -class InterPackRepo(InterSameDataRepository): |
348 | - """Optimised code paths between Pack based repositories.""" |
349 | - |
350 | - @classmethod |
351 | - def _get_repo_format_to_test(self): |
352 | - from bzrlib.repofmt import pack_repo |
353 | - return pack_repo.RepositoryFormatKnitPack6RichRoot() |
354 | - |
355 | - @staticmethod |
356 | - def is_compatible(source, target): |
357 | - """Be compatible with known Pack formats. |
358 | - |
359 | - We don't test for the stores being of specific types because that |
360 | - could lead to confusing results, and there is no need to be |
361 | - overly general. |
362 | - |
363 | - InterPackRepo does not support CHK based repositories. |
364 | - """ |
365 | - from bzrlib.repofmt.pack_repo import RepositoryFormatPack |
366 | - from bzrlib.repofmt.groupcompress_repo import RepositoryFormatCHK1 |
367 | - try: |
368 | - are_packs = (isinstance(source._format, RepositoryFormatPack) and |
369 | - isinstance(target._format, RepositoryFormatPack)) |
370 | - not_packs = (isinstance(source._format, RepositoryFormatCHK1) or |
371 | - isinstance(target._format, RepositoryFormatCHK1)) |
372 | - except AttributeError: |
373 | - return False |
374 | - if not_packs or not are_packs: |
375 | - return False |
376 | - return InterRepository._same_model(source, target) |
377 | - |
378 | - @needs_write_lock |
379 | - def fetch(self, revision_id=None, pb=None, find_ghosts=False, |
380 | - fetch_spec=None): |
381 | - """See InterRepository.fetch().""" |
382 | - if (len(self.source._fallback_repositories) > 0 or |
383 | - len(self.target._fallback_repositories) > 0): |
384 | - # The pack layer is not aware of fallback repositories, so when |
385 | - # fetching from a stacked repository or into a stacked repository |
386 | - # we use the generic fetch logic which uses the VersionedFiles |
387 | - # attributes on repository. |
388 | - from bzrlib.fetch import RepoFetcher |
389 | - fetcher = RepoFetcher(self.target, self.source, revision_id, |
390 | - pb, find_ghosts, fetch_spec=fetch_spec) |
391 | - if fetch_spec is not None: |
392 | - if len(list(fetch_spec.heads)) != 1: |
393 | - raise AssertionError( |
394 | - "InterPackRepo.fetch doesn't support " |
395 | - "fetching multiple heads yet.") |
396 | - revision_id = list(fetch_spec.heads)[0] |
397 | - fetch_spec = None |
398 | - if revision_id is None: |
399 | - # TODO: |
400 | - # everything to do - use pack logic |
401 | - # to fetch from all packs to one without |
402 | - # inventory parsing etc, IFF nothing to be copied is in the target. |
403 | - # till then: |
404 | - source_revision_ids = frozenset(self.source.all_revision_ids()) |
405 | - revision_ids = source_revision_ids - \ |
406 | - frozenset(self.target.get_parent_map(source_revision_ids)) |
407 | - revision_keys = [(revid,) for revid in revision_ids] |
408 | - index = self.target._pack_collection.revision_index.combined_index |
409 | - present_revision_ids = set(item[1][0] for item in |
410 | - index.iter_entries(revision_keys)) |
411 | - revision_ids = set(revision_ids) - present_revision_ids |
412 | - # implementing the TODO will involve: |
413 | - # - detecting when all of a pack is selected |
414 | - # - avoiding as much as possible pre-selection, so the |
415 | - # more-core routines such as create_pack_from_packs can filter in |
416 | - # a just-in-time fashion. (though having a HEADS list on a |
417 | - # repository might make this a lot easier, because we could |
418 | - # sensibly detect 'new revisions' without doing a full index scan. |
419 | - elif _mod_revision.is_null(revision_id): |
420 | - # nothing to do: |
421 | - return (0, []) |
422 | - else: |
423 | - revision_ids = self.search_missing_revision_ids(revision_id, |
424 | - find_ghosts=find_ghosts).get_keys() |
425 | - if len(revision_ids) == 0: |
426 | - return (0, []) |
427 | - return self._pack(self.source, self.target, revision_ids) |
428 | - |
429 | - def _pack(self, source, target, revision_ids): |
430 | - from bzrlib.repofmt.pack_repo import Packer |
431 | - packs = source._pack_collection.all_packs() |
432 | - pack = Packer(self.target._pack_collection, packs, '.fetch', |
433 | - revision_ids).pack() |
434 | - if pack is not None: |
435 | - self.target._pack_collection._save_pack_names() |
436 | - copied_revs = pack.get_revision_count() |
437 | - # Trigger an autopack. This may duplicate effort as we've just done |
438 | - # a pack creation, but for now it is simpler to think about as |
439 | - # 'upload data, then repack if needed'. |
440 | - self.target._pack_collection.autopack() |
441 | - return (copied_revs, []) |
442 | - else: |
443 | - return (0, []) |
444 | - |
445 | - @needs_read_lock |
446 | - def search_missing_revision_ids(self, revision_id=None, find_ghosts=True): |
447 | - """See InterRepository.missing_revision_ids(). |
448 | - |
449 | - :param find_ghosts: Find ghosts throughout the ancestry of |
450 | - revision_id. |
451 | - """ |
452 | - if not find_ghosts and revision_id is not None: |
453 | - return self._walk_to_common_revisions([revision_id]) |
454 | - elif revision_id is not None: |
455 | - # Find ghosts: search for revisions pointing from one repository to |
456 | - # the other, and vice versa, anywhere in the history of revision_id. |
457 | - graph = self.target.get_graph(other_repository=self.source) |
458 | - searcher = graph._make_breadth_first_searcher([revision_id]) |
459 | - found_ids = set() |
460 | - while True: |
461 | - try: |
462 | - next_revs, ghosts = searcher.next_with_ghosts() |
463 | - except StopIteration: |
464 | - break |
465 | - if revision_id in ghosts: |
466 | - raise errors.NoSuchRevision(self.source, revision_id) |
467 | - found_ids.update(next_revs) |
468 | - found_ids.update(ghosts) |
469 | - found_ids = frozenset(found_ids) |
470 | - # Double query here: should be able to avoid this by changing the |
471 | - # graph api further. |
472 | - result_set = found_ids - frozenset( |
473 | - self.target.get_parent_map(found_ids)) |
474 | - else: |
475 | - source_ids = self.source.all_revision_ids() |
476 | - # source_ids is the worst possible case we may need to pull. |
477 | - # now we want to filter source_ids against what we actually |
478 | - # have in target, but don't try to check for existence where we know |
479 | - # we do not have a revision as that would be pointless. |
480 | - target_ids = set(self.target.all_revision_ids()) |
481 | - result_set = set(source_ids).difference(target_ids) |
482 | - return self.source.revision_ids_to_search_result(result_set) |
483 | - |
484 | - |
485 | class InterDifferingSerializer(InterRepository): |
486 | |
487 | @classmethod |
488 | @@ -3836,7 +3721,6 @@ |
489 | InterRepository.register_optimiser(InterSameDataRepository) |
490 | InterRepository.register_optimiser(InterWeaveRepo) |
491 | InterRepository.register_optimiser(InterKnitRepo) |
492 | -InterRepository.register_optimiser(InterPackRepo) |
493 | |
494 | |
495 | class CopyConverter(object): |
496 | |
497 | === modified file 'bzrlib/tests/test_pack_repository.py' |
498 | --- bzrlib/tests/test_pack_repository.py 2009-06-10 03:56:49 +0000 |
499 | +++ bzrlib/tests/test_pack_repository.py 2009-06-16 02:36:36 +0000 |
500 | @@ -38,6 +38,10 @@ |
501 | upgrade, |
502 | workingtree, |
503 | ) |
504 | +from bzrlib.repofmt import ( |
505 | + pack_repo, |
506 | + groupcompress_repo, |
507 | + ) |
508 | from bzrlib.repofmt.groupcompress_repo import RepositoryFormatCHK1 |
509 | from bzrlib.smart import ( |
510 | client, |
511 | @@ -556,58 +560,43 @@ |
512 | missing_ghost.get_inventory, 'ghost') |
513 | |
514 | def make_write_ready_repo(self): |
515 | - repo = self.make_repository('.', format=self.get_format()) |
516 | + format = self.get_format() |
517 | + if isinstance(format.repository_format, RepositoryFormatCHK1): |
518 | + raise TestNotApplicable("No missing compression parents") |
519 | + repo = self.make_repository('.', format=format) |
520 | repo.lock_write() |
521 | + self.addCleanup(repo.unlock) |
522 | repo.start_write_group() |
523 | + self.addCleanup(repo.abort_write_group) |
524 | return repo |
525 | |
526 | def test_missing_inventories_compression_parent_prevents_commit(self): |
527 | repo = self.make_write_ready_repo() |
528 | key = ('junk',) |
529 | - if not getattr(repo.inventories._index, '_missing_compression_parents', |
530 | - None): |
531 | - raise TestSkipped("No missing compression parents") |
532 | repo.inventories._index._missing_compression_parents.add(key) |
533 | self.assertRaises(errors.BzrCheckError, repo.commit_write_group) |
534 | self.assertRaises(errors.BzrCheckError, repo.commit_write_group) |
535 | - repo.abort_write_group() |
536 | - repo.unlock() |
537 | |
538 | def test_missing_revisions_compression_parent_prevents_commit(self): |
539 | repo = self.make_write_ready_repo() |
540 | key = ('junk',) |
541 | - if not getattr(repo.inventories._index, '_missing_compression_parents', |
542 | - None): |
543 | - raise TestSkipped("No missing compression parents") |
544 | repo.revisions._index._missing_compression_parents.add(key) |
545 | self.assertRaises(errors.BzrCheckError, repo.commit_write_group) |
546 | self.assertRaises(errors.BzrCheckError, repo.commit_write_group) |
547 | - repo.abort_write_group() |
548 | - repo.unlock() |
549 | |
550 | def test_missing_signatures_compression_parent_prevents_commit(self): |
551 | repo = self.make_write_ready_repo() |
552 | key = ('junk',) |
553 | - if not getattr(repo.inventories._index, '_missing_compression_parents', |
554 | - None): |
555 | - raise TestSkipped("No missing compression parents") |
556 | repo.signatures._index._missing_compression_parents.add(key) |
557 | self.assertRaises(errors.BzrCheckError, repo.commit_write_group) |
558 | self.assertRaises(errors.BzrCheckError, repo.commit_write_group) |
559 | - repo.abort_write_group() |
560 | - repo.unlock() |
561 | |
562 | def test_missing_text_compression_parent_prevents_commit(self): |
563 | repo = self.make_write_ready_repo() |
564 | key = ('some', 'junk') |
565 | - if not getattr(repo.inventories._index, '_missing_compression_parents', |
566 | - None): |
567 | - raise TestSkipped("No missing compression parents") |
568 | repo.texts._index._missing_compression_parents.add(key) |
569 | self.assertRaises(errors.BzrCheckError, repo.commit_write_group) |
570 | e = self.assertRaises(errors.BzrCheckError, repo.commit_write_group) |
571 | - repo.abort_write_group() |
572 | - repo.unlock() |
573 | |
574 | def test_supports_external_lookups(self): |
575 | repo = self.make_repository('.', format=self.get_format()) |
576 | |
577 | === modified file 'bzrlib/tests/test_repository.py' |
578 | --- bzrlib/tests/test_repository.py 2009-06-10 03:56:49 +0000 |
579 | +++ bzrlib/tests/test_repository.py 2009-06-16 02:36:36 +0000 |
580 | @@ -31,7 +31,10 @@ |
581 | UnknownFormatError, |
582 | UnsupportedFormatError, |
583 | ) |
584 | -from bzrlib import graph |
585 | +from bzrlib import ( |
586 | + graph, |
587 | + tests, |
588 | + ) |
589 | from bzrlib.branchbuilder import BranchBuilder |
590 | from bzrlib.btree_index import BTreeBuilder, BTreeGraphIndex |
591 | from bzrlib.index import GraphIndex, InMemoryGraphIndex |
592 | @@ -685,6 +688,147 @@ |
593 | self.assertEqual(65536, |
594 | inv.parent_id_basename_to_file_id._root_node.maximum_size) |
595 | |
596 | + def test_stream_source_to_gc(self): |
597 | + source = self.make_repository('source', format='development6-rich-root') |
598 | + target = self.make_repository('target', format='development6-rich-root') |
599 | + stream = source._get_source(target._format) |
600 | + self.assertIsInstance(stream, groupcompress_repo.GroupCHKStreamSource) |
601 | + |
602 | + def test_stream_source_to_non_gc(self): |
603 | + source = self.make_repository('source', format='development6-rich-root') |
604 | + target = self.make_repository('target', format='rich-root-pack') |
605 | + stream = source._get_source(target._format) |
606 | + # We don't want the child GroupCHKStreamSource |
607 | + self.assertIs(type(stream), repository.StreamSource) |
608 | + |
609 | + def test_get_stream_for_missing_keys_includes_all_chk_refs(self): |
610 | + source_builder = self.make_branch_builder('source', |
611 | + format='development6-rich-root') |
612 | + # We have to build a fairly large tree, so that we are sure the chk |
613 | + # pages will have split into multiple pages. |
614 | + entries = [('add', ('', 'a-root-id', 'directory', None))] |
615 | + for i in 'abcdefghijklmnopqrstuvwxyz123456789': |
616 | + for j in 'abcdefghijklmnopqrstuvwxyz123456789': |
617 | + fname = i + j |
618 | + fid = fname + '-id' |
619 | + content = 'content for %s\n' % (fname,) |
620 | + entries.append(('add', (fname, fid, 'file', content))) |
621 | + source_builder.start_series() |
622 | + source_builder.build_snapshot('rev-1', None, entries) |
623 | + # Now change a few of them, so we get a few new pages for the second |
624 | + # revision |
625 | + source_builder.build_snapshot('rev-2', ['rev-1'], [ |
626 | + ('modify', ('aa-id', 'new content for aa-id\n')), |
627 | + ('modify', ('cc-id', 'new content for cc-id\n')), |
628 | + ('modify', ('zz-id', 'new content for zz-id\n')), |
629 | + ]) |
630 | + source_builder.finish_series() |
631 | + source_branch = source_builder.get_branch() |
632 | + source_branch.lock_read() |
633 | + self.addCleanup(source_branch.unlock) |
634 | + target = self.make_repository('target', format='development6-rich-root') |
635 | + source = source_branch.repository._get_source(target._format) |
636 | + self.assertIsInstance(source, groupcompress_repo.GroupCHKStreamSource) |
637 | + |
638 | + # On a regular pass, getting the inventories and chk pages for rev-2 |
639 | + # would only get the newly created chk pages |
640 | + search = graph.SearchResult(set(['rev-2']), set(['rev-1']), 1, |
641 | + set(['rev-2'])) |
642 | + simple_chk_records = [] |
643 | + for vf_name, substream in source.get_stream(search): |
644 | + if vf_name == 'chk_bytes': |
645 | + for record in substream: |
646 | + simple_chk_records.append(record.key) |
647 | + else: |
648 | + for _ in substream: |
649 | + continue |
650 | + # 3 pages, the root (InternalNode), + 2 pages which actually changed |
651 | + self.assertEqual([('sha1:91481f539e802c76542ea5e4c83ad416bf219f73',), |
652 | + ('sha1:4ff91971043668583985aec83f4f0ab10a907d3f',), |
653 | + ('sha1:81e7324507c5ca132eedaf2d8414ee4bb2226187',), |
654 | + ('sha1:b101b7da280596c71a4540e9a1eeba8045985ee0',)], |
655 | + simple_chk_records) |
656 | + # Now, when we do a similar call using 'get_stream_for_missing_keys' |
657 | + # we should get a much larger set of pages. |
658 | + missing = [('inventories', 'rev-2')] |
659 | + full_chk_records = [] |
660 | + for vf_name, substream in source.get_stream_for_missing_keys(missing): |
661 | + if vf_name == 'inventories': |
662 | + for record in substream: |
663 | + self.assertEqual(('rev-2',), record.key) |
664 | + elif vf_name == 'chk_bytes': |
665 | + for record in substream: |
666 | + full_chk_records.append(record.key) |
667 | + else: |
668 | + self.fail('Should not be getting a stream of %s' % (vf_name,)) |
669 | + # We have 257 records now. This is because we have 1 root page, and 256 |
670 | + # leaf pages in a complete listing. |
671 | + self.assertEqual(257, len(full_chk_records)) |
672 | + self.assertSubset(simple_chk_records, full_chk_records) |
673 | + |
674 | + |
675 | +class TestKnitPackStreamSource(tests.TestCaseWithMemoryTransport): |
676 | + |
677 | + def test_source_to_exact_pack_092(self): |
678 | + source = self.make_repository('source', format='pack-0.92') |
679 | + target = self.make_repository('target', format='pack-0.92') |
680 | + stream_source = source._get_source(target._format) |
681 | + self.assertIsInstance(stream_source, pack_repo.KnitPackStreamSource) |
682 | + |
683 | + def test_source_to_exact_pack_rich_root_pack(self): |
684 | + source = self.make_repository('source', format='rich-root-pack') |
685 | + target = self.make_repository('target', format='rich-root-pack') |
686 | + stream_source = source._get_source(target._format) |
687 | + self.assertIsInstance(stream_source, pack_repo.KnitPackStreamSource) |
688 | + |
689 | + def test_source_to_exact_pack_19(self): |
690 | + source = self.make_repository('source', format='1.9') |
691 | + target = self.make_repository('target', format='1.9') |
692 | + stream_source = source._get_source(target._format) |
693 | + self.assertIsInstance(stream_source, pack_repo.KnitPackStreamSource) |
694 | + |
695 | + def test_source_to_exact_pack_19_rich_root(self): |
696 | + source = self.make_repository('source', format='1.9-rich-root') |
697 | + target = self.make_repository('target', format='1.9-rich-root') |
698 | + stream_source = source._get_source(target._format) |
699 | + self.assertIsInstance(stream_source, pack_repo.KnitPackStreamSource) |
700 | + |
701 | + def test_source_to_remote_exact_pack_19(self): |
702 | + trans = self.make_smart_server('target') |
703 | + trans.ensure_base() |
704 | + source = self.make_repository('source', format='1.9') |
705 | + target = self.make_repository('target', format='1.9') |
706 | + target = repository.Repository.open(trans.base) |
707 | + stream_source = source._get_source(target._format) |
708 | + self.assertIsInstance(stream_source, pack_repo.KnitPackStreamSource) |
709 | + |
710 | + def test_stream_source_to_non_exact(self): |
711 | + source = self.make_repository('source', format='pack-0.92') |
712 | + target = self.make_repository('target', format='1.9') |
713 | + stream = source._get_source(target._format) |
714 | + self.assertIs(type(stream), repository.StreamSource) |
715 | + |
716 | + def test_stream_source_to_non_exact_rich_root(self): |
717 | + source = self.make_repository('source', format='1.9') |
718 | + target = self.make_repository('target', format='1.9-rich-root') |
719 | + stream = source._get_source(target._format) |
720 | + self.assertIs(type(stream), repository.StreamSource) |
721 | + |
722 | + def test_source_to_remote_non_exact_pack_19(self): |
723 | + trans = self.make_smart_server('target') |
724 | + trans.ensure_base() |
725 | + source = self.make_repository('source', format='1.9') |
726 | + target = self.make_repository('target', format='1.6') |
727 | + target = repository.Repository.open(trans.base) |
728 | + stream_source = source._get_source(target._format) |
729 | + self.assertIs(type(stream_source), repository.StreamSource) |
730 | + |
731 | + def test_stream_source_to_knit(self): |
732 | + source = self.make_repository('source', format='pack-0.92') |
733 | + target = self.make_repository('target', format='dirstate') |
734 | + stream = source._get_source(target._format) |
735 | + self.assertIs(type(stream), repository.StreamSource) |
736 | + |
737 | |
738 | class TestDevelopment6FindParentIdsOfRevisions(TestCaseWithTransport): |
739 | """Tests for _find_parent_ids_of_revisions.""" |
740 | @@ -1204,84 +1348,3 @@ |
741 | self.assertTrue(new_pack.inventory_index._optimize_for_size) |
742 | self.assertTrue(new_pack.text_index._optimize_for_size) |
743 | self.assertTrue(new_pack.signature_index._optimize_for_size) |
744 | - |
745 | - |
746 | -class TestGCCHKPackCollection(TestCaseWithTransport): |
747 | - |
748 | - def test_stream_source_to_gc(self): |
749 | - source = self.make_repository('source', format='development6-rich-root') |
750 | - target = self.make_repository('target', format='development6-rich-root') |
751 | - stream = source._get_source(target._format) |
752 | - self.assertIsInstance(stream, groupcompress_repo.GroupCHKStreamSource) |
753 | - |
754 | - def test_stream_source_to_non_gc(self): |
755 | - source = self.make_repository('source', format='development6-rich-root') |
756 | - target = self.make_repository('target', format='rich-root-pack') |
757 | - stream = source._get_source(target._format) |
758 | - # We don't want the child GroupCHKStreamSource |
759 | - self.assertIs(type(stream), repository.StreamSource) |
760 | - |
761 | - def test_get_stream_for_missing_keys_includes_all_chk_refs(self): |
762 | - source_builder = self.make_branch_builder('source', |
763 | - format='development6-rich-root') |
764 | - # We have to build a fairly large tree, so that we are sure the chk |
765 | - # pages will have split into multiple pages. |
766 | - entries = [('add', ('', 'a-root-id', 'directory', None))] |
767 | - for i in 'abcdefghijklmnopqrstuvwxyz123456789': |
768 | - for j in 'abcdefghijklmnopqrstuvwxyz123456789': |
769 | - fname = i + j |
770 | - fid = fname + '-id' |
771 | - content = 'content for %s\n' % (fname,) |
772 | - entries.append(('add', (fname, fid, 'file', content))) |
773 | - source_builder.start_series() |
774 | - source_builder.build_snapshot('rev-1', None, entries) |
775 | - # Now change a few of them, so we get a few new pages for the second |
776 | - # revision |
777 | - source_builder.build_snapshot('rev-2', ['rev-1'], [ |
778 | - ('modify', ('aa-id', 'new content for aa-id\n')), |
779 | - ('modify', ('cc-id', 'new content for cc-id\n')), |
780 | - ('modify', ('zz-id', 'new content for zz-id\n')), |
781 | - ]) |
782 | - source_builder.finish_series() |
783 | - source_branch = source_builder.get_branch() |
784 | - source_branch.lock_read() |
785 | - self.addCleanup(source_branch.unlock) |
786 | - target = self.make_repository('target', format='development6-rich-root') |
787 | - source = source_branch.repository._get_source(target._format) |
788 | - self.assertIsInstance(source, groupcompress_repo.GroupCHKStreamSource) |
789 | - |
790 | - # On a regular pass, getting the inventories and chk pages for rev-2 |
791 | - # would only get the newly created chk pages |
792 | - search = graph.SearchResult(set(['rev-2']), set(['rev-1']), 1, |
793 | - set(['rev-2'])) |
794 | - simple_chk_records = [] |
795 | - for vf_name, substream in source.get_stream(search): |
796 | - if vf_name == 'chk_bytes': |
797 | - for record in substream: |
798 | - simple_chk_records.append(record.key) |
799 | - else: |
800 | - for _ in substream: |
801 | - continue |
802 | - # 3 pages, the root (InternalNode), + 2 pages which actually changed |
803 | - self.assertEqual([('sha1:91481f539e802c76542ea5e4c83ad416bf219f73',), |
804 | - ('sha1:4ff91971043668583985aec83f4f0ab10a907d3f',), |
805 | - ('sha1:81e7324507c5ca132eedaf2d8414ee4bb2226187',), |
806 | - ('sha1:b101b7da280596c71a4540e9a1eeba8045985ee0',)], |
807 | - simple_chk_records) |
808 | - # Now, when we do a similar call using 'get_stream_for_missing_keys' |
809 | - # we should get a much larger set of pages. |
810 | - missing = [('inventories', 'rev-2')] |
811 | - full_chk_records = [] |
812 | - for vf_name, substream in source.get_stream_for_missing_keys(missing): |
813 | - if vf_name == 'inventories': |
814 | - for record in substream: |
815 | - self.assertEqual(('rev-2',), record.key) |
816 | - elif vf_name == 'chk_bytes': |
817 | - for record in substream: |
818 | - full_chk_records.append(record.key) |
819 | - else: |
820 | - self.fail('Should not be getting a stream of %s' % (vf_name,)) |
821 | - # We have 257 records now. This is because we have 1 root page, and 256 |
822 | - # leaf pages in a complete listing. |
823 | - self.assertEqual(257, len(full_chk_records)) |
824 | - self.assertSubset(simple_chk_records, full_chk_records) |
This proposal changes how pack <=> pack fetching triggers.
It removes the InterPackRepo optimizer (which uses Packer internally) in favor of a new KnitPackStreamS ource.
The new source is a very streamlined version of StreamSource, which doesn't attempt to handle all the different cross-format issues. It only supports exact format fetching, and does so in a nice streamlined fashion.
Specifically, it sends data as (signatures, revisions, inventories, texts) since it knows we have atomic insertion.
It walks the inventory pages a single time, and extracts the text keys as the fetch is going, rather than doing so in a pre-read fetch. This is a moderate win for dump transport fetching (versus StreamSource, but not InterPackRepo) because it avoids reading the Inventory pages twice.
It also fixes a bug with the current InterPackRepo code. Namely, the Packer code was recently changed to make sure that all file_keys that are referenced are fetched, rather than only the ones mentioned in the specific revisions being fetched. This was done at ~ the same time as the updates to file_ids_ altered_ by... However, in updating that, it was not updated to read the parent inventories and remove their text keys.
This meant that if you got a fulltext inventory, you would end up copying the data for all texts in that revision, whether they were modified or not. For bzr.dev, this meant that it often downloaded ~3MB of extra data for a small change. I considered fixing Packer to handle this, but I figured we wanted to move to StreamSource as the one-and-only method for fetching anyway.
I also did a little bit of changes to make it clearer when a set of something was *keys* (tuples) and when it was *ids* (strings).
I also moved some of the helpers that were added as part of the gc-stacking patch, into the base Repository class, so that I could simply re-use them.