Merge lp:~spiv/bzr/inventory-delta into lp:~bzr/bzr/trunk-old

Proposed by Andrew Bennetts
Status: Superseded
Proposed branch: lp:~spiv/bzr/inventory-delta
Merge into: lp:~bzr/bzr/trunk-old
Diff against target: 2656 lines
To merge this branch: bzr merge lp:~spiv/bzr/inventory-delta
Reviewer Review Type Date Requested Status
bzr-core Pending
Review via email: mp+8860@code.launchpad.net

This proposal has been superseded by a proposal from 2009-07-22.

To post a comment you must log in.
Revision history for this message
Andrew Bennetts (spiv) wrote :

This is a pretty big patch. It does lots of things:

 * adds new insert_stream and get_stream verbs
 * adds de/serialization of inventory-delta records on the network
 * fixes rich-root generation in StreamSource
 * adds a bunch of new scenarios to per_interrepository tests
 * fixes some 'pack already exist' bugs for packing a single GC pack (i.e. when
   the new pack is already optimal).
 * improves the inventory_delta module a little
 * various miscellaneous fixes and new tests that are hopefully self-evident
 * and, most controversially, removes InterDifferingSerializer.

From John's mail a while back there were a bunch of issues with removing IDS. I
think the outstanding ones are:

> 1) Incremental updates. IDS converts batches of 100 revs at a time,
> which also triggers autopacks at 1k revs. Streaming fetch is currently
> an all-or-nothing, which isn't appropriate (IMO) for conversions.
> Consider that conversion can take *days*, it is important to have
> something that can be stopped and resumed.
>
> 2) Also, auto-packing as you go avoids the case you ran into, where bzr
> bloats to 2.4GB before packing back to 25MB. We know the new format is
> even more sensitive to packing efficiency. Not to mention that a single
> big-stream generates a single large pack, it isn't directly obvious that
> we are being so inefficient.

i.e. performance concerns.

The streaming code is pretty similar in how it does the conversion now to the
way IDS did it, but probably still different enough that we will want to measure
the impact of this. I'm definitely concerned about case 2, the lack of packing
as you go, although perhaps the degree of bloat is reduced by using
semantic inventory-delta records?

The reason why I eventually deleted IDS was that it was just too burdensome to
keep two code paths alive, thoroughly tested, and correct. For instance, if we
simply reinstated IDS for local-only fetches then most of the test suite,
including the relevant interrepo tests, will only exercise IDS. Also, IDS
turned out to have a bug when used on a stacked repository that the extending
test suite in this branch revealed (I've forgotten the details, but can dig them
up if you like). It didn't seem worth the hassle of fixing IDS when I already
had a working implementation.

I'm certainly open to reinstating IDS if it's the most expedient way to have
reasonable local performance for upgrades, but I thought I'd try to be bold and
see if we could just live without the extra complexity. Maybe we can improve
performance of streaming rather than resurrect IDS?

-Andrew.

Revision history for this message
John A Meinel (jameinel) wrote :
Download full text (4.5 KiB)

-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

Andrew Bennetts wrote:
> Andrew Bennetts has proposed merging lp:~spiv/bzr/inventory-delta into lp:bzr.
>
> Requested reviews:
> bzr-core (bzr-core)
>
> This is a pretty big patch. It does lots of things:
>
> * adds new insert_stream and get_stream verbs
> * adds de/serialization of inventory-delta records on the network
> * fixes rich-root generation in StreamSource
> * adds a bunch of new scenarios to per_interrepository tests
> * fixes some 'pack already exist' bugs for packing a single GC pack (i.e. when
> the new pack is already optimal).
> * improves the inventory_delta module a little
> * various miscellaneous fixes and new tests that are hopefully self-evident
> * and, most controversially, removes InterDifferingSerializer.
>
>>From John's mail a while back there were a bunch of issues with removing IDS. I
> think the outstanding ones are:
>
>> 1) Incremental updates. IDS converts batches of 100 revs at a time,
>> which also triggers autopacks at 1k revs. Streaming fetch is currently
>> an all-or-nothing, which isn't appropriate (IMO) for conversions.
>> Consider that conversion can take *days*, it is important to have
>> something that can be stopped and resumed.

It also picks out the 'optimal' deltas by computing many different ones
and finding whichever one was the 'smallest'. For local conversions, the
time to compute 2-3 deltas was much smaller than to apply an inefficient
delta.

>>
>> 2) Also, auto-packing as you go avoids the case you ran into, where bzr
>> bloats to 2.4GB before packing back to 25MB. We know the new format is
>> even more sensitive to packing efficiency. Not to mention that a single
>> big-stream generates a single large pack, it isn't directly obvious that
>> we are being so inefficient.
>
> i.e. performance concerns.
>

Generally, yes.

There is also:

3) Being able to resume because you snapshotted periodically as you
went. This seems even more important for a network transfer.

> The streaming code is pretty similar in how it does the conversion now to the
> way IDS did it, but probably still different enough that we will want to measure
> the impact of this. I'm definitely concerned about case 2, the lack of packing
> as you go, although perhaps the degree of bloat is reduced by using
> semantic inventory-delta records?
>

I don't think bzr bloating from 100MB => 2.4GB (and then back down to
25MB post pack) was because of inventory records. However, if it was
purely because of a bad streaming order, we could probably fix that by
changing how we stream texts.

> The reason why I eventually deleted IDS was that it was just too burdensome to
> keep two code paths alive, thoroughly tested, and correct. For instance, if we
> simply reinstated IDS for local-only fetches then most of the test suite,
> including the relevant interrepo tests, will only exercise IDS. Also, IDS
> turned out to have a bug when used on a stacked repository that the extending
> test suite in this branch revealed (I've forgotten the details, but can dig them
> up if you like). It didn't seem worth the hassle of fixing IDS when I already
> had a working imple...

Read more...

Revision history for this message
John A Meinel (jameinel) wrote :

-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

...
>
> There is also:
>
> 3) Being able to resume because you snapshotted periodically as you
> went. This seems even more important for a network transfer.

and

4) Progress indication

This is really quite useful for a process that can take *days* to
complete. The Stream code is often quite nice, but the fact that it
gives you 2 states:
 'getting stream'
 'inserting stream'

and nothing more than that is pretty crummy.

John
=:->
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.9 (Cygwin)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org

iEYEARECAAYFAkpfUK8ACgkQJdeBCYSNAAMa+wCgybpPdd4Yie/Craew/zxX9eF7
cWMAoNcxPftDDdLssboDW7rezk4d2L2d
=WA26
-----END PGP SIGNATURE-----

Revision history for this message
John A Meinel (jameinel) wrote :
Download full text (3.2 KiB)

-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

Andrew Bennetts wrote:
> Andrew Bennetts has proposed merging lp:~spiv/bzr/inventory-delta into lp:bzr.
>
> Requested reviews:
> bzr-core (bzr-core)
>
> This is a pretty big patch. It does lots of things:
>
> * adds new insert_stream and get_stream verbs
> * adds de/serialization of inventory-delta records on the network
> * fixes rich-root generation in StreamSource
> * adds a bunch of new scenarios to per_interrepository tests
> * fixes some 'pack already exist' bugs for packing a single GC pack (i.e. when
> the new pack is already optimal).
> * improves the inventory_delta module a little
> * various miscellaneous fixes and new tests that are hopefully self-evident
> * and, most controversially, removes InterDifferingSerializer.
>
>>From John's mail a while back there were a bunch of issues with removing IDS. I
> think the outstanding ones are:

So for starters, let me mention what I found wrt performance:

time bzr.dev branch mysql-1k myqsl-2a/1k
  real 3m18.490s

time bzr.dev+xml8 branch mysql-1k myqsl-2a/1k
  real 2m29.953s

+xml8 is just this patch:
=== modified file 'bzrlib/xml8.py'
- --- bzrlib/xml8.py 2009-07-07 04:32:13 +0000
+++ bzrlib/xml8.py 2009-07-16 16:14:38 +0000
@@ -433,9 +433,9 @@
                 pass
             else:
                 # Only copying directory entries drops us 2.85s => 2.35s
- - # if cached_ie.kind == 'directory':
- - # return cached_ie.copy()
- - # return cached_ie
+ if cached_ie.kind == 'directory':
+ return cached_ie.copy()
+ return cached_ie
                 return cached_ie.copy()

         kind = elt.tag

It has 2 basic effects:

1) Avoid copying all inventory entries all the time (so reduce the time
spent in InventoryEntry.copy())

2) By re-using exact objects "_make_delta" can do "x is y" comparisons,
rather than having to do:
  x.attribute1 == y.attribute1
  and x.attribute2 == y.attribute2
etc.

As you can see it is a big win for this test case (about 4:3 or 33% faster)

So what about Andrew's work:

time bzr.inv.delta branch mysql-1k myqsl-2a/1k
  real 10m14.267s

time bzr.inv.delta+xml8 branch mysql-1k myqsl-2a/1k
  real 9m49.372s

It also was stuck at:
[##################- ] Fetching revisions:Inserting stream:Walking
content 912/1043

For most of that time, making it really look like it was stalled.

Anyway, this isn't something where it is, say, 10% slower which is
acceptable because we get rid of some extra code paths. This ends up
being 3-4x slower and no longer giving any progress information.

If that scales to launchpad sized projects, you are talking 4-days
becoming 16-days (aka > 2 weeks).

So honestly, I don't think we can land this as is. I won't stick on the
performance side if people feel it is acceptable. But I did spend a lot
of time optimizing IDS that clearly hasn't been done with StreamSource.

John
=:->
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.9 (Cygwin)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org

iEYEARECAAYFAkpfWm0ACgkQJdeBCYSNAAM8mgCgru3K3SpP8BcMZdLJLH...

Read more...

Revision history for this message
John A Meinel (jameinel) wrote :

-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

John A Meinel wrote:
> Andrew Bennetts wrote:

...

> So for starters, let me mention what I found wrt performance:
>
> time bzr.dev branch mysql-1k myqsl-2a/1k
> real 3m18.490s
>
> time bzr.dev+xml8 branch mysql-1k myqsl-2a/1k
> real 2m29.953s

...

> time bzr.inv.delta branch mysql-1k myqsl-2a/1k
> real 10m14.267s
>
> time bzr.inv.delta+xml8 branch mysql-1k myqsl-2a/1k
> real 9m49.372s

Also, for real-world space issues:
$ du -ksh mysql-2a*/.bzr/repository/obsolete*
1.9M mysql-2a-bzr.dev/.bzr/repository/obsolete_packs
467M mysql-2a-inv-delta/.bzr/repository/obsolete_packs

The peak size (watch du -ksh mysql-2a-bzr.dev) during conversion using
IDS was 49MB.

$ du -ksh mysql-2a*/.bzr/repository/packs*
11M mysql-2a-bzr.dev/.bzr/repository/packs
9.1M mysql-2a-inv-delta/.bzr/repository/packs

So the new code wins slightly in the final size on disk, because it
packed at the end, rather than at 1k revs (and then there were another
40+ revs inserted.)

However, it bloated from 15MB => 467MB while it was doing the transfer
before the final size. Versus a peak of 50MB (almost 10x larger).

John
=:->
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.9 (Cygwin)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org

iEYEARECAAYFAkpfdCkACgkQJdeBCYSNAANABACgl4l4L1AjaiXRJgrn5iwLrVe1
tVEAnRRJ0QbWzd8lXFXQXhWdhvqFjnw8
=pXZe
-----END PGP SIGNATURE-----

Revision history for this message
Andrew Bennetts (spiv) wrote :

John A Meinel wrote:
[...]
> It also picks out the 'optimal' deltas by computing many different ones
> and finding whichever one was the 'smallest'. For local conversions, the
> time to compute 2-3 deltas was much smaller than to apply an inefficient
> delta.

FWIW, the streaming code also does this. My guess (not yet measured) is that
sending less bytes over the network is also a win, especially when one parent
might be a one-liner and the other might be large merge from trunk.

[...]
> There is also:
>
> 3) Being able to resume because you snapshotted periodically as you
> went. This seems even more important for a network transfer.

Yes, although we already don't have this for the network. It would be great to
have...

[...]
> I'm certainly open to the suggestion of getting rid of IDS. I don't like
> having multiple code paths. It just happens that there are *big* wins
> and it is often easier to write optimized code in a different framework.

Sure. Like I said for me it was just getting to be a large hassle to maintain
both paths in my branch, even though they were increasingly sharing a lot of
code for e.g. rich root generation before I deleted IDS.

I'd like to try see if we can cheaply fix the performance issues you report in
other mails without needing IDS. If we do need IDS for a while longer then
fine, although I think we'll want to restrict it to local source, local target,
non-stacked cases only.

Thanks for the measurements and quick feedback.

-Andrew.

Revision history for this message
Robert Collins (lifeless) wrote :

On Thu, 2009-07-16 at 16:06 +0000, John A Meinel wrote:
>
> (3) is an issue I'd like to see addressed, but which Robert seems
> particularly unhappy having us try to do. (See other bug comments, etc
> about how other systems don't do it and he feels it isn't worth
> doing.)

I'd like to be clear about this. I'd be ecstatic *if* we can do it well
and robustly. However I don't think it is *at all* easy to that. If I'm
wrong - great.

I'm fine with keeping IDS for local fetches. But when networking is
involved IDS is massively slower than the streaming codepath.

> It was fairly straightforward to do with IDS, the argument I think
> from
> Robert is that the client would need to be computing whether it has a
> 'complete' set and thus can commit the current write group. (the
> *source* knows these sort of things, and can just say "and now you
> have
> it", but the client has to re-do all that work to figure it out from a
> stream.)

I think that aspect is simple - we have a stream subtype that says
'checkpoint'. Its the requirement to do all that work that is, I think
problematic - and thats *without* considering stacking, which makes it
hugely harder.

-Rob

Revision history for this message
Robert Collins (lifeless) wrote :

On Thu, 2009-07-16 at 16:12 +0000, John A Meinel wrote:
>
>
> 4) Progress indication
>
> This is really quite useful for a process that can take *days* to
> complete. The Stream code is often quite nice, but the fact that it
> gives you 2 states:
> 'getting stream'
> 'inserting stream'
>
> and nothing more than that is pretty crummy.

That is a separate bug however, and one that affects normal fetches too.
So I don't think tying it to the IDS discussion is necessary or
particularly helpful.

-Rob

Revision history for this message
John A Meinel (jameinel) wrote :

-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

Robert Collins wrote:
> On Thu, 2009-07-16 at 16:12 +0000, John A Meinel wrote:
>>
>> 4) Progress indication
>>
>> This is really quite useful for a process that can take *days* to
>> complete. The Stream code is often quite nice, but the fact that it
>> gives you 2 states:
>> 'getting stream'
>> 'inserting stream'
>>
>> and nothing more than that is pretty crummy.
>
> That is a separate bug however, and one that affects normal fetches too.
> So I don't think tying it to the IDS discussion is necessary or
> particularly helpful.
>
> -Rob
>

It is explicitly relevant that doing "bzr upgrade --2a" which will take
longer-than-normal would now not even show a progress bar.

For local fetches, you don't even get the "transport activity"
indicator, so it *really* looks hung. It doesn't even write things into
.bzr.log so that you know it is doing anything other than spinning in a
while True loop. I guess you can tell because your disk consumption is
going way up...

I don't honestly know the performance difference for streaming a lot of
content over the network. Given a 4x performance slowdown, for large
fetches IDS could still be faster. I certainly agree that IDS is
probably significantly more inefficient when doing something like "give
me the last 2 revs".

It honestly wasn't something I was optimizing for (cross format
fetching). I *was* trying to make 'bzr upgrade' be measured in hours
rather than days/weeks/etc.

Also, given that you have to upgrade all of your stacked locations at
the same time, and --2a is a trap door, aren't 95% of upgrades going to
be all at once anyway?

John
=:->

-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.9 (Cygwin)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org

iEYEARECAAYFAkpf46YACgkQJdeBCYSNAANwAwCfYQj7gws3O4KDPxqrcMLu4nfB
554AoIyuns4b5Fsa3wf4uFhf4Uex00oQ
=qjX9
-----END PGP SIGNATURE-----

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'NEWS'
2--- NEWS 2009-07-20 21:21:10 +0000
3+++ NEWS 2009-07-22 00:35:22 +0000
4@@ -56,6 +56,9 @@
5 lots of backtraces about ``UnknownSmartMethod``, ``do_chunk`` or
6 ``do_end``. (Andrew Bennetts, #338561)
7
8+* StreamSource generates rich roots from non-rich root sources correctly
9+ now. (Andrew Bennetts, #368921)
10+
11 * ``WorkingTree4.unversion`` will no longer fail to unversion ids which
12 were present in a parent tree but renamed in the working tree.
13 (Robert Collins, #187207)
14@@ -63,6 +66,12 @@
15 Improvements
16 ************
17
18+* Cross-format fetches (such as between 1.9-rich-root and 2a) via the
19+ smart server are more efficient now. They send inventory deltas rather
20+ than full inventories. The smart server has two new requests,
21+ ``Repository.get_stream_1.18`` and ``Repository.insert_stream_1.18`` to
22+ support this. (Andrew Bennetts, #374738, #385826)
23+
24 Documentation
25 *************
26
27@@ -84,6 +93,9 @@
28 * ``CHKMap.apply_delta`` now raises ``InconsistentDelta`` if a delta adds
29 as new a key which was already mapped. (Robert Collins)
30
31+* InterDifferingSerializer has been removed. The transformations it
32+ provided are now done automatically by StreamSource. (Andrew Bennetts)
33+
34 * Inventory delta application catches more cases of corruption and can
35 prevent corrupt deltas from affecting consistency of data structures on
36 disk. (Robert Collins)
37
38=== modified file 'bzrlib/fetch.py'
39--- bzrlib/fetch.py 2009-06-17 17:57:15 +0000
40+++ bzrlib/fetch.py 2009-07-22 00:35:22 +0000
41@@ -25,16 +25,21 @@
42
43 import operator
44
45+from bzrlib.lazy_import import lazy_import
46+lazy_import(globals(), """
47+from bzrlib import (
48+ tsort,
49+ versionedfile,
50+ )
51+""")
52 import bzrlib
53 from bzrlib import (
54 errors,
55 symbol_versioning,
56 )
57 from bzrlib.revision import NULL_REVISION
58-from bzrlib.tsort import topo_sort
59 from bzrlib.trace import mutter
60 import bzrlib.ui
61-from bzrlib.versionedfile import FulltextContentFactory
62
63
64 class RepoFetcher(object):
65@@ -216,11 +221,9 @@
66
67 def _find_root_ids(self, revs, parent_map, graph):
68 revision_root = {}
69- planned_versions = {}
70 for tree in self.iter_rev_trees(revs):
71 revision_id = tree.inventory.root.revision
72 root_id = tree.get_root_id()
73- planned_versions.setdefault(root_id, []).append(revision_id)
74 revision_root[revision_id] = root_id
75 # Find out which parents we don't already know root ids for
76 parents = set()
77@@ -232,7 +235,7 @@
78 for tree in self.iter_rev_trees(parents):
79 root_id = tree.get_root_id()
80 revision_root[tree.get_revision_id()] = root_id
81- return revision_root, planned_versions
82+ return revision_root
83
84 def generate_root_texts(self, revs):
85 """Generate VersionedFiles for all root ids.
86@@ -241,9 +244,8 @@
87 """
88 graph = self.source.get_graph()
89 parent_map = graph.get_parent_map(revs)
90- rev_order = topo_sort(parent_map)
91- rev_id_to_root_id, root_id_to_rev_ids = self._find_root_ids(
92- revs, parent_map, graph)
93+ rev_order = tsort.topo_sort(parent_map)
94+ rev_id_to_root_id = self._find_root_ids(revs, parent_map, graph)
95 root_id_order = [(rev_id_to_root_id[rev_id], rev_id) for rev_id in
96 rev_order]
97 # Guaranteed stable, this groups all the file id operations together
98@@ -252,20 +254,77 @@
99 # yet, and are unlikely to in non-rich-root environments anyway.
100 root_id_order.sort(key=operator.itemgetter(0))
101 # Create a record stream containing the roots to create.
102- def yield_roots():
103- for key in root_id_order:
104- root_id, rev_id = key
105- rev_parents = parent_map[rev_id]
106- # We drop revision parents with different file-ids, because
107- # that represents a rename of the root to a different location
108- # - its not actually a parent for us. (We could look for that
109- # file id in the revision tree at considerably more expense,
110- # but for now this is sufficient (and reconcile will catch and
111- # correct this anyway).
112- # When a parent revision is a ghost, we guess that its root id
113- # was unchanged (rather than trimming it from the parent list).
114- parent_keys = tuple((root_id, parent) for parent in rev_parents
115- if parent != NULL_REVISION and
116- rev_id_to_root_id.get(parent, root_id) == root_id)
117- yield FulltextContentFactory(key, parent_keys, None, '')
118- return [('texts', yield_roots())]
119+ from bzrlib.graph import FrozenHeadsCache
120+ graph = FrozenHeadsCache(graph)
121+ new_roots_stream = _new_root_data_stream(
122+ root_id_order, rev_id_to_root_id, parent_map, self.source, graph)
123+ return [('texts', new_roots_stream)]
124+
125+
126+def _new_root_data_stream(
127+ root_keys_to_create, rev_id_to_root_id_map, parent_map, repo, graph=None):
128+ for root_key in root_keys_to_create:
129+ root_id, rev_id = root_key
130+ parent_keys = _parent_keys_for_root_version(
131+ root_id, rev_id, rev_id_to_root_id_map, parent_map, repo, graph)
132+ yield versionedfile.FulltextContentFactory(
133+ root_key, parent_keys, None, '')
134+
135+
136+def _parent_keys_for_root_version(
137+ root_id, rev_id, rev_id_to_root_id_map, parent_map, repo, graph=None):
138+ """Get the parent keys for a given root id."""
139+ # Include direct parents of the revision, but only if they used the same
140+ # root_id and are heads.
141+ rev_parents = parent_map[rev_id]
142+ parent_ids = []
143+ for parent_id in rev_parents:
144+ if parent_id == NULL_REVISION:
145+ continue
146+ if parent_id not in rev_id_to_root_id_map:
147+ # We probably didn't read this revision, go spend the extra effort
148+ # to actually check
149+ try:
150+ tree = repo.revision_tree(parent_id)
151+ except errors.NoSuchRevision:
152+ # Ghost, fill out rev_id_to_root_id in case we encounter this
153+ # again.
154+ # But set parent_root_id to None since we don't really know
155+ parent_root_id = None
156+ else:
157+ parent_root_id = tree.get_root_id()
158+ rev_id_to_root_id_map[parent_id] = None
159+ # XXX: why not:
160+ # rev_id_to_root_id_map[parent_id] = parent_root_id
161+ # memory consumption maybe?
162+ else:
163+ parent_root_id = rev_id_to_root_id_map[parent_id]
164+ if root_id == parent_root_id:
165+ # With stacking we _might_ want to refer to a non-local revision,
166+ # but this code path only applies when we have the full content
167+ # available, so ghosts really are ghosts, not just the edge of
168+ # local data.
169+ parent_ids.append(parent_id)
170+ else:
171+ # root_id may be in the parent anyway.
172+ try:
173+ tree = repo.revision_tree(parent_id)
174+ except errors.NoSuchRevision:
175+ # ghost, can't refer to it.
176+ pass
177+ else:
178+ try:
179+ parent_ids.append(tree.inventory[root_id].revision)
180+ except errors.NoSuchId:
181+ # not in the tree
182+ pass
183+ # Drop non-head parents
184+ if graph is None:
185+ graph = repo.get_graph()
186+ heads = graph.heads(parent_ids)
187+ selected_ids = []
188+ for parent_id in parent_ids:
189+ if parent_id in heads and parent_id not in selected_ids:
190+ selected_ids.append(parent_id)
191+ parent_keys = [(root_id, parent_id) for parent_id in selected_ids]
192+ return parent_keys
193
194=== modified file 'bzrlib/inventory_delta.py'
195--- bzrlib/inventory_delta.py 2009-04-02 05:53:12 +0000
196+++ bzrlib/inventory_delta.py 2009-07-22 00:35:22 +0000
197@@ -119,28 +119,46 @@
198 class InventoryDeltaSerializer(object):
199 """Serialize and deserialize inventory deltas."""
200
201+ # XXX: really, the serializer and deserializer should be two separate
202+ # classes.
203+
204 FORMAT_1 = 'bzr inventory delta v1 (bzr 1.14)'
205
206- def __init__(self, versioned_root, tree_references):
207- """Create an InventoryDeltaSerializer.
208+ def __init__(self):
209+ """Create an InventoryDeltaSerializer."""
210+ self._versioned_root = None
211+ self._tree_references = None
212+ self._entry_to_content = {
213+ 'directory': _directory_content,
214+ 'file': _file_content,
215+ 'symlink': _link_content,
216+ }
217+
218+ def require_flags(self, versioned_root=None, tree_references=None):
219+ """Set the versioned_root and/or tree_references flags for this
220+ (de)serializer.
221
222 :param versioned_root: If True, any root entry that is seen is expected
223 to be versioned, and root entries can have any fileid.
224 :param tree_references: If True support tree-reference entries.
225 """
226+ if versioned_root is not None and self._versioned_root is not None:
227+ raise AssertionError(
228+ "require_flags(versioned_root=...) already called.")
229+ if tree_references is not None and self._tree_references is not None:
230+ raise AssertionError(
231+ "require_flags(tree_references=...) already called.")
232 self._versioned_root = versioned_root
233 self._tree_references = tree_references
234- self._entry_to_content = {
235- 'directory': _directory_content,
236- 'file': _file_content,
237- 'symlink': _link_content,
238- }
239 if tree_references:
240 self._entry_to_content['tree-reference'] = _reference_content
241
242 def delta_to_lines(self, old_name, new_name, delta_to_new):
243 """Return a line sequence for delta_to_new.
244
245+ Both the versioned_root and tree_references flags must be set via
246+ require_flags before calling this.
247+
248 :param old_name: A UTF8 revision id for the old inventory. May be
249 NULL_REVISION if there is no older inventory and delta_to_new
250 includes the entire inventory contents.
251@@ -150,6 +168,10 @@
252 takes.
253 :return: The serialized delta as lines.
254 """
255+ if self._versioned_root is None or self._tree_references is None:
256+ raise AssertionError(
257+ "Cannot serialise unless versioned_root/tree_references flags "
258+ "are both set.")
259 lines = ['', '', '', '', '']
260 to_line = self._delta_item_to_line
261 for delta_item in delta_to_new:
262@@ -188,6 +210,10 @@
263 oldpath_utf8 = 'None'
264 else:
265 oldpath_utf8 = '/' + oldpath.encode('utf8')
266+ if newpath == '/':
267+ raise AssertionError(
268+ "Bad inventory delta: '/' is not a valid newpath "
269+ "(should be '') in delta item %r" % (delta_item,))
270 # TODO: Test real-world utf8 cache hit rate. It may be a win.
271 newpath_utf8 = '/' + newpath.encode('utf8')
272 # Serialize None as ''
273@@ -221,10 +247,18 @@
274 def parse_text_bytes(self, bytes):
275 """Parse the text bytes of a serialized inventory delta.
276
277+ If versioned_root and/or tree_references flags were set via
278+ require_flags, then the parsed flags must match or a BzrError will be
279+ raised.
280+
281 :param bytes: The bytes to parse. This can be obtained by calling
282 delta_to_lines and then doing ''.join(delta_lines).
283- :return: (parent_id, new_id, inventory_delta)
284+ :return: (parent_id, new_id, versioned_root, tree_references,
285+ inventory_delta)
286 """
287+ if bytes[-1:] != '\n':
288+ last_line = bytes.rsplit('\n', 1)[-1]
289+ raise errors.BzrError('last line not empty: %r' % (last_line,))
290 lines = bytes.split('\n')[:-1] # discard the last empty line
291 if not lines or lines[0] != 'format: %s' % InventoryDeltaSerializer.FORMAT_1:
292 raise errors.BzrError('unknown format %r' % lines[0:1])
293@@ -240,11 +274,13 @@
294 if len(lines) < 5 or not lines[4].startswith('tree_references: '):
295 raise errors.BzrError('missing tree_references: marker')
296 delta_tree_references = self._deserialize_bool(lines[4][17:])
297- if delta_versioned_root != self._versioned_root:
298+ if (self._versioned_root is not None and
299+ delta_versioned_root != self._versioned_root):
300 raise errors.BzrError(
301 "serialized versioned_root flag is wrong: %s" %
302 (delta_versioned_root,))
303- if delta_tree_references != self._tree_references:
304+ if (self._tree_references is not None
305+ and delta_tree_references != self._tree_references):
306 raise errors.BzrError(
307 "serialized tree_references flag is wrong: %s" %
308 (delta_tree_references,))
309@@ -266,22 +302,34 @@
310 raise errors.BzrError("Versioned root found: %r" % line)
311 elif last_modified[-1] == ':':
312 raise errors.BzrError('special revisionid found: %r' % line)
313- if not delta_tree_references and content.startswith('tree\x00'):
314+ if delta_tree_references is False and content.startswith('tree\x00'):
315 raise errors.BzrError("Tree reference found: %r" % line)
316- content_tuple = tuple(content.split('\x00'))
317- entry = _parse_entry(
318- newpath_utf8, file_id, parent_id, last_modified, content_tuple)
319 if oldpath_utf8 == 'None':
320 oldpath = None
321+ elif oldpath_utf8[:1] != '/':
322+ raise errors.BzrError(
323+ "oldpath invalid (does not start with /): %r"
324+ % (oldpath_utf8,))
325 else:
326+ oldpath_utf8 = oldpath_utf8[1:]
327 oldpath = oldpath_utf8.decode('utf8')
328 if newpath_utf8 == 'None':
329 newpath = None
330+ elif newpath_utf8[:1] != '/':
331+ raise errors.BzrError(
332+ "newpath invalid (does not start with /): %r"
333+ % (newpath_utf8,))
334 else:
335+ # Trim leading slash
336+ newpath_utf8 = newpath_utf8[1:]
337 newpath = newpath_utf8.decode('utf8')
338+ content_tuple = tuple(content.split('\x00'))
339+ entry = _parse_entry(
340+ newpath_utf8, file_id, parent_id, last_modified, content_tuple)
341 delta_item = (oldpath, newpath, file_id, entry)
342 result.append(delta_item)
343- return delta_parent_id, delta_version_id, result
344+ return (delta_parent_id, delta_version_id, delta_versioned_root,
345+ delta_tree_references, result)
346
347
348 def _parse_entry(utf8_path, file_id, parent_id, last_modified, content):
349
350=== modified file 'bzrlib/remote.py'
351--- bzrlib/remote.py 2009-07-06 09:47:35 +0000
352+++ bzrlib/remote.py 2009-07-22 00:35:22 +0000
353@@ -422,6 +422,7 @@
354 self._custom_format = None
355 self._network_name = None
356 self._creating_bzrdir = None
357+ self._supports_chks = None
358 self._supports_external_lookups = None
359 self._supports_tree_reference = None
360 self._rich_root_data = None
361@@ -439,6 +440,13 @@
362 return self._rich_root_data
363
364 @property
365+ def supports_chks(self):
366+ if self._supports_chks is None:
367+ self._ensure_real()
368+ self._supports_chks = self._custom_format.supports_chks
369+ return self._supports_chks
370+
371+ @property
372 def supports_external_lookups(self):
373 if self._supports_external_lookups is None:
374 self._ensure_real()
375@@ -575,6 +583,11 @@
376 self._ensure_real()
377 return self._custom_format._serializer
378
379+ @property
380+ def repository_class(self):
381+ self._ensure_real()
382+ return self._custom_format.repository_class
383+
384
385 class RemoteRepository(_RpcHelper):
386 """Repository accessed over rpc.
387@@ -1158,9 +1171,9 @@
388 self._ensure_real()
389 return self._real_repository.get_inventory(revision_id)
390
391- def iter_inventories(self, revision_ids):
392+ def iter_inventories(self, revision_ids, ordering='unordered'):
393 self._ensure_real()
394- return self._real_repository.iter_inventories(revision_ids)
395+ return self._real_repository.iter_inventories(revision_ids, ordering)
396
397 @needs_read_lock
398 def get_revision(self, revision_id):
399@@ -1647,6 +1660,9 @@
400
401 class RemoteStreamSink(repository.StreamSink):
402
403+ def __init__(self, target_repo):
404+ repository.StreamSink.__init__(self, target_repo)
405+
406 def _insert_real(self, stream, src_format, resume_tokens):
407 self.target_repo._ensure_real()
408 sink = self.target_repo._real_repository._get_sink()
409@@ -1658,43 +1674,57 @@
410 def insert_stream(self, stream, src_format, resume_tokens):
411 target = self.target_repo
412 target._unstacked_provider.missing_keys.clear()
413+ candidate_calls = [('Repository.insert_stream_1.18', (1, 18))]
414 if target._lock_token:
415- verb = 'Repository.insert_stream_locked'
416- extra_args = (target._lock_token or '',)
417- required_version = (1, 14)
418+ candidate_calls.append(('Repository.insert_stream_locked', (1, 14)))
419+ lock_args = (target._lock_token or '',)
420 else:
421- verb = 'Repository.insert_stream'
422- extra_args = ()
423- required_version = (1, 13)
424+ candidate_calls.append(('Repository.insert_stream', (1, 13)))
425+ lock_args = ()
426 client = target._client
427 medium = client._medium
428- if medium._is_remote_before(required_version):
429- # No possible way this can work.
430- return self._insert_real(stream, src_format, resume_tokens)
431 path = target.bzrdir._path_for_remote_call(client)
432- if not resume_tokens:
433- # XXX: Ugly but important for correctness, *will* be fixed during
434- # 1.13 cycle. Pushing a stream that is interrupted results in a
435- # fallback to the _real_repositories sink *with a partial stream*.
436- # Thats bad because we insert less data than bzr expected. To avoid
437- # this we do a trial push to make sure the verb is accessible, and
438- # do not fallback when actually pushing the stream. A cleanup patch
439- # is going to look at rewinding/restarting the stream/partial
440- # buffering etc.
441+ found_verb = False
442+ for verb, required_version in candidate_calls:
443+ if medium._is_remote_before(required_version):
444+ continue
445+ if resume_tokens:
446+ # We've already done the probing (and set _is_remote_before) on
447+ # a previous insert.
448+ found_verb = True
449+ break
450 byte_stream = smart_repo._stream_to_byte_stream([], src_format)
451 try:
452 response = client.call_with_body_stream(
453- (verb, path, '') + extra_args, byte_stream)
454+ (verb, path, '') + lock_args, byte_stream)
455 except errors.UnknownSmartMethod:
456 medium._remember_remote_is_before(required_version)
457- return self._insert_real(stream, src_format, resume_tokens)
458+ else:
459+ found_verb = True
460+ break
461+ if not found_verb:
462+ # Have to use VFS.
463+ return self._insert_real(stream, src_format, resume_tokens)
464+ self._last_inv_record = None
465+ self._last_substream = None
466+ if required_version < (1, 18):
467+ # Remote side doesn't support inventory deltas. Wrap the stream to
468+ # make sure we don't send any. If the stream contains inventory
469+ # deltas we'll interrupt the smart insert_stream request and
470+ # fallback to VFS.
471+ stream = self._stop_stream_if_inventory_delta(stream)
472 byte_stream = smart_repo._stream_to_byte_stream(
473 stream, src_format)
474 resume_tokens = ' '.join(resume_tokens)
475 response = client.call_with_body_stream(
476- (verb, path, resume_tokens) + extra_args, byte_stream)
477+ (verb, path, resume_tokens) + lock_args, byte_stream)
478 if response[0][0] not in ('ok', 'missing-basis'):
479 raise errors.UnexpectedSmartServerResponse(response)
480+ if self._last_inv_record is not None:
481+ # The stream included an inventory-delta record, but the remote
482+ # side isn't new enough to support them. So we need to send the
483+ # rest of the stream via VFS.
484+ return self._resume_stream_with_vfs(response, src_format)
485 if response[0][0] == 'missing-basis':
486 tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
487 resume_tokens = tokens
488@@ -1703,6 +1733,60 @@
489 self.target_repo.refresh_data()
490 return [], set()
491
492+ def _resume_stream_with_vfs(self, response, src_format):
493+ """Resume sending a stream via VFS, first resending the record and
494+ substream that couldn't be sent via an insert_stream verb.
495+ """
496+ if response[0][0] == 'missing-basis':
497+ tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
498+ # Ignore missing_keys, we haven't finished inserting yet
499+ else:
500+ tokens = []
501+ def resume_substream():
502+ # First yield the record we stopped at.
503+ yield self._last_inv_record
504+ self._last_inv_record = None
505+ # Then yield the rest of the substream that was interrupted.
506+ for record in self._last_substream:
507+ yield record
508+ self._last_substream = None
509+ def resume_stream():
510+ # Finish sending the interrupted substream
511+ yield ('inventories', resume_substream())
512+ # Then simply continue sending the rest of the stream.
513+ for substream_kind, substream in self._last_stream:
514+ yield substream_kind, substream
515+ return self._insert_real(resume_stream(), src_format, tokens)
516+
517+ def _stop_stream_if_inventory_delta(self, stream):
518+ """Normally this just lets the original stream pass-through unchanged.
519+
520+ However if any 'inventories' substream includes an inventory-delta
521+ record it will stop streaming, and store the interrupted record,
522+ substream and stream in self._last_inv_record, self._last_substream and
523+ self._last_stream so that the stream can be resumed by
524+ _resume_stream_with_vfs.
525+ """
526+ def filter_inv_substream(inv_substream):
527+ substream_iter = iter(inv_substream)
528+ for record in substream_iter:
529+ if record.storage_kind == 'inventory-delta':
530+ self._last_inv_record = record
531+ self._last_substream = substream_iter
532+ return
533+ else:
534+ yield record
535+
536+ stream_iter = iter(stream)
537+ for substream_kind, substream in stream_iter:
538+ if substream_kind == 'inventories':
539+ yield substream_kind, filter_inv_substream(substream)
540+ if self._last_inv_record is not None:
541+ self._last_stream = stream_iter
542+ return
543+ else:
544+ yield substream_kind, substream
545+
546
547 class RemoteStreamSource(repository.StreamSource):
548 """Stream data from a remote server."""
549@@ -1714,6 +1798,12 @@
550 return self.missing_parents_chain(search, [self.from_repository] +
551 self.from_repository._fallback_repositories)
552
553+ def get_stream_for_missing_keys(self, missing_keys):
554+ self.from_repository._ensure_real()
555+ real_repo = self.from_repository._real_repository
556+ real_source = real_repo._get_source(self.to_format)
557+ return real_source.get_stream_for_missing_keys(missing_keys)
558+
559 def _real_stream(self, repo, search):
560 """Get a stream for search from repo.
561
562@@ -1748,18 +1838,26 @@
563 return self._real_stream(repo, search)
564 client = repo._client
565 medium = client._medium
566- if medium._is_remote_before((1, 13)):
567- # streaming was added in 1.13
568- return self._real_stream(repo, search)
569 path = repo.bzrdir._path_for_remote_call(client)
570- try:
571- search_bytes = repo._serialise_search_result(search)
572- response = repo._call_with_body_bytes_expecting_body(
573- 'Repository.get_stream',
574- (path, self.to_format.network_name()), search_bytes)
575- response_tuple, response_handler = response
576- except errors.UnknownSmartMethod:
577- medium._remember_remote_is_before((1,13))
578+ search_bytes = repo._serialise_search_result(search)
579+ args = (path, self.to_format.network_name())
580+ candidate_verbs = [
581+ ('Repository.get_stream_1.18', (1, 18)),
582+ ('Repository.get_stream', (1, 13))]
583+ found_verb = False
584+ for verb, version in candidate_verbs:
585+ if medium._is_remote_before(version):
586+ continue
587+ try:
588+ response = repo._call_with_body_bytes_expecting_body(
589+ verb, args, search_bytes)
590+ except errors.UnknownSmartMethod:
591+ medium._remember_remote_is_before(version)
592+ else:
593+ response_tuple, response_handler = response
594+ found_verb = True
595+ break
596+ if not found_verb:
597 return self._real_stream(repo, search)
598 if response_tuple[0] != 'ok':
599 raise errors.UnexpectedSmartServerResponse(response_tuple)
600
601=== modified file 'bzrlib/repofmt/groupcompress_repo.py'
602--- bzrlib/repofmt/groupcompress_repo.py 2009-07-14 17:33:13 +0000
603+++ bzrlib/repofmt/groupcompress_repo.py 2009-07-22 00:35:23 +0000
604@@ -154,6 +154,8 @@
605 self._writer.begin()
606 # what state is the pack in? (open, finished, aborted)
607 self._state = 'open'
608+ # no name until we finish writing the content
609+ self.name = None
610
611 def _check_references(self):
612 """Make sure our external references are present.
613@@ -466,6 +468,13 @@
614 if not self._use_pack(self.new_pack):
615 self.new_pack.abort()
616 return None
617+ self.new_pack.finish_content()
618+ if len(self.packs) == 1:
619+ old_pack = self.packs[0]
620+ if old_pack.name == self.new_pack._hash.hexdigest():
621+ # The single old pack was already optimally packed.
622+ self.new_pack.abort()
623+ return None
624 self.pb.update('finishing repack', 6, 7)
625 self.new_pack.finish()
626 self._pack_collection.allocate(self.new_pack)
627@@ -766,10 +775,10 @@
628 if basis_tree is not None:
629 basis_tree.unlock()
630
631- def _iter_inventories(self, revision_ids):
632+ def _iter_inventories(self, revision_ids, ordering):
633 """Iterate over many inventory objects."""
634 keys = [(revision_id,) for revision_id in revision_ids]
635- stream = self.inventories.get_record_stream(keys, 'unordered', True)
636+ stream = self.inventories.get_record_stream(keys, ordering, True)
637 texts = {}
638 for record in stream:
639 if record.storage_kind != 'absent':
640@@ -779,7 +788,7 @@
641 for key in keys:
642 yield inventory.CHKInventory.deserialise(self.chk_bytes, texts[key], key)
643
644- def _iter_inventory_xmls(self, revision_ids):
645+ def _iter_inventory_xmls(self, revision_ids, ordering):
646 # Without a native 'xml' inventory, this method doesn't make sense, so
647 # make it raise to trap naughty direct users.
648 raise NotImplementedError(self._iter_inventory_xmls)
649@@ -879,14 +888,13 @@
650
651 def _get_source(self, to_format):
652 """Return a source for streaming from this repository."""
653- if isinstance(to_format, remote.RemoteRepositoryFormat):
654- # Can't just check attributes on to_format with the current code,
655- # work around this:
656- to_format._ensure_real()
657- to_format = to_format._custom_format
658- if to_format.__class__ is self._format.__class__:
659+ if (to_format.supports_chks and
660+ self._format.repository_class is to_format.repository_class and
661+ self._format._serializer == to_format._serializer):
662 # We must be exactly the same format, otherwise stuff like the chk
663- # page layout might be different
664+ # page layout might be different.
665+ # Actually, this test is just slightly looser than exact so that
666+ # CHK2 <-> 2a transfers will work.
667 return GroupCHKStreamSource(self, to_format)
668 return super(CHKInventoryRepository, self)._get_source(to_format)
669
670@@ -1036,8 +1044,6 @@
671 repository_class = CHKInventoryRepository
672 supports_external_lookups = True
673 supports_chks = True
674- # For right now, setting this to True gives us InterModel1And2 rather
675- # than InterDifferingSerializer
676 _commit_builder_class = PackRootCommitBuilder
677 rich_root_data = True
678 _serializer = chk_serializer.chk_serializer_255_bigpage
679
680=== modified file 'bzrlib/repofmt/pack_repo.py'
681--- bzrlib/repofmt/pack_repo.py 2009-07-01 10:42:14 +0000
682+++ bzrlib/repofmt/pack_repo.py 2009-07-22 00:35:23 +0000
683@@ -422,6 +422,8 @@
684 self._writer.begin()
685 # what state is the pack in? (open, finished, aborted)
686 self._state = 'open'
687+ # no name until we finish writing the content
688+ self.name = None
689
690 def abort(self):
691 """Cancel creating this pack."""
692@@ -448,6 +450,14 @@
693 self.signature_index.key_count() or
694 (self.chk_index is not None and self.chk_index.key_count()))
695
696+ def finish_content(self):
697+ if self.name is not None:
698+ return
699+ self._writer.end()
700+ if self._buffer[1]:
701+ self._write_data('', flush=True)
702+ self.name = self._hash.hexdigest()
703+
704 def finish(self, suspend=False):
705 """Finish the new pack.
706
707@@ -459,10 +469,7 @@
708 - stores the index size tuple for the pack in the index_sizes
709 attribute.
710 """
711- self._writer.end()
712- if self._buffer[1]:
713- self._write_data('', flush=True)
714- self.name = self._hash.hexdigest()
715+ self.finish_content()
716 if not suspend:
717 self._check_references()
718 # write indices
719@@ -1567,7 +1574,7 @@
720 # determine which packs need changing
721 pack_operations = [[0, []]]
722 for pack in self.all_packs():
723- if not hint or pack.name in hint:
724+ if hint is None or pack.name in hint:
725 pack_operations[-1][0] += pack.get_revision_count()
726 pack_operations[-1][1].append(pack)
727 self._execute_pack_operations(pack_operations, OptimisingPacker)
728@@ -2093,6 +2100,7 @@
729 # when autopack takes no steps, the names list is still
730 # unsaved.
731 return self._save_pack_names()
732+ return []
733
734 def _suspend_write_group(self):
735 tokens = [pack.name for pack in self._resumed_packs]
736
737=== modified file 'bzrlib/repository.py'
738--- bzrlib/repository.py 2009-07-02 23:10:53 +0000
739+++ bzrlib/repository.py 2009-07-22 00:35:22 +0000
740@@ -31,6 +31,7 @@
741 gpg,
742 graph,
743 inventory,
744+ inventory_delta,
745 lazy_regex,
746 lockable_files,
747 lockdir,
748@@ -923,6 +924,11 @@
749 """
750 if self._write_group is not self.get_transaction():
751 # has an unlock or relock occured ?
752+ if suppress_errors:
753+ mutter(
754+ '(suppressed) mismatched lock context and write group. %r, %r',
755+ self._write_group, self.get_transaction())
756+ return
757 raise errors.BzrError(
758 'mismatched lock context and write group. %r, %r' %
759 (self._write_group, self.get_transaction()))
760@@ -2178,7 +2184,7 @@
761 """Get Inventory object by revision id."""
762 return self.iter_inventories([revision_id]).next()
763
764- def iter_inventories(self, revision_ids):
765+ def iter_inventories(self, revision_ids, ordering='unordered'):
766 """Get many inventories by revision_ids.
767
768 This will buffer some or all of the texts used in constructing the
769@@ -2186,21 +2192,23 @@
770 time.
771
772 :param revision_ids: The expected revision ids of the inventories.
773+ :param ordering: optional ordering, e.g. 'topological'.
774 :return: An iterator of inventories.
775 """
776 if ((None in revision_ids)
777 or (_mod_revision.NULL_REVISION in revision_ids)):
778 raise ValueError('cannot get null revision inventory')
779- return self._iter_inventories(revision_ids)
780+ return self._iter_inventories(revision_ids, ordering)
781
782- def _iter_inventories(self, revision_ids):
783+ def _iter_inventories(self, revision_ids, ordering):
784 """single-document based inventory iteration."""
785- for text, revision_id in self._iter_inventory_xmls(revision_ids):
786+ inv_xmls = self._iter_inventory_xmls(revision_ids, ordering)
787+ for text, revision_id in inv_xmls:
788 yield self.deserialise_inventory(revision_id, text)
789
790- def _iter_inventory_xmls(self, revision_ids):
791+ def _iter_inventory_xmls(self, revision_ids, ordering='unordered'):
792 keys = [(revision_id,) for revision_id in revision_ids]
793- stream = self.inventories.get_record_stream(keys, 'unordered', True)
794+ stream = self.inventories.get_record_stream(keys, ordering, True)
795 text_chunks = {}
796 for record in stream:
797 if record.storage_kind != 'absent':
798@@ -2236,7 +2244,7 @@
799 @needs_read_lock
800 def get_inventory_xml(self, revision_id):
801 """Get inventory XML as a file object."""
802- texts = self._iter_inventory_xmls([revision_id])
803+ texts = self._iter_inventory_xmls([revision_id], 'unordered')
804 try:
805 text, revision_id = texts.next()
806 except StopIteration:
807@@ -3480,288 +3488,6 @@
808 return self.source.revision_ids_to_search_result(result_set)
809
810
811-class InterDifferingSerializer(InterRepository):
812-
813- @classmethod
814- def _get_repo_format_to_test(self):
815- return None
816-
817- @staticmethod
818- def is_compatible(source, target):
819- """Be compatible with Knit2 source and Knit3 target"""
820- # This is redundant with format.check_conversion_target(), however that
821- # raises an exception, and we just want to say "False" as in we won't
822- # support converting between these formats.
823- if source.supports_rich_root() and not target.supports_rich_root():
824- return False
825- if (source._format.supports_tree_reference
826- and not target._format.supports_tree_reference):
827- return False
828- return True
829-
830- def _get_delta_for_revision(self, tree, parent_ids, basis_id, cache):
831- """Get the best delta and base for this revision.
832-
833- :return: (basis_id, delta)
834- """
835- possible_trees = [(parent_id, cache[parent_id])
836- for parent_id in parent_ids
837- if parent_id in cache]
838- if len(possible_trees) == 0:
839- # There either aren't any parents, or the parents aren't in the
840- # cache, so just use the last converted tree
841- possible_trees.append((basis_id, cache[basis_id]))
842- deltas = []
843- for basis_id, basis_tree in possible_trees:
844- delta = tree.inventory._make_delta(basis_tree.inventory)
845- deltas.append((len(delta), basis_id, delta))
846- deltas.sort()
847- return deltas[0][1:]
848-
849- def _get_parent_keys(self, root_key, parent_map):
850- """Get the parent keys for a given root id."""
851- root_id, rev_id = root_key
852- # Include direct parents of the revision, but only if they used
853- # the same root_id and are heads.
854- parent_keys = []
855- for parent_id in parent_map[rev_id]:
856- if parent_id == _mod_revision.NULL_REVISION:
857- continue
858- if parent_id not in self._revision_id_to_root_id:
859- # We probably didn't read this revision, go spend the
860- # extra effort to actually check
861- try:
862- tree = self.source.revision_tree(parent_id)
863- except errors.NoSuchRevision:
864- # Ghost, fill out _revision_id_to_root_id in case we
865- # encounter this again.
866- # But set parent_root_id to None since we don't really know
867- parent_root_id = None
868- else:
869- parent_root_id = tree.get_root_id()
870- self._revision_id_to_root_id[parent_id] = None
871- else:
872- parent_root_id = self._revision_id_to_root_id[parent_id]
873- if root_id == parent_root_id:
874- # With stacking we _might_ want to refer to a non-local
875- # revision, but this code path only applies when we have the
876- # full content available, so ghosts really are ghosts, not just
877- # the edge of local data.
878- parent_keys.append((parent_id,))
879- else:
880- # root_id may be in the parent anyway.
881- try:
882- tree = self.source.revision_tree(parent_id)
883- except errors.NoSuchRevision:
884- # ghost, can't refer to it.
885- pass
886- else:
887- try:
888- parent_keys.append((tree.inventory[root_id].revision,))
889- except errors.NoSuchId:
890- # not in the tree
891- pass
892- g = graph.Graph(self.source.revisions)
893- heads = g.heads(parent_keys)
894- selected_keys = []
895- for key in parent_keys:
896- if key in heads and key not in selected_keys:
897- selected_keys.append(key)
898- return tuple([(root_id,)+ key for key in selected_keys])
899-
900- def _new_root_data_stream(self, root_keys_to_create, parent_map):
901- for root_key in root_keys_to_create:
902- parent_keys = self._get_parent_keys(root_key, parent_map)
903- yield versionedfile.FulltextContentFactory(root_key,
904- parent_keys, None, '')
905-
906- def _fetch_batch(self, revision_ids, basis_id, cache):
907- """Fetch across a few revisions.
908-
909- :param revision_ids: The revisions to copy
910- :param basis_id: The revision_id of a tree that must be in cache, used
911- as a basis for delta when no other base is available
912- :param cache: A cache of RevisionTrees that we can use.
913- :return: The revision_id of the last converted tree. The RevisionTree
914- for it will be in cache
915- """
916- # Walk though all revisions; get inventory deltas, copy referenced
917- # texts that delta references, insert the delta, revision and
918- # signature.
919- root_keys_to_create = set()
920- text_keys = set()
921- pending_deltas = []
922- pending_revisions = []
923- parent_map = self.source.get_parent_map(revision_ids)
924- for tree in self.source.revision_trees(revision_ids):
925- current_revision_id = tree.get_revision_id()
926- parent_ids = parent_map.get(current_revision_id, ())
927- basis_id, delta = self._get_delta_for_revision(tree, parent_ids,
928- basis_id, cache)
929- if self._converting_to_rich_root:
930- self._revision_id_to_root_id[current_revision_id] = \
931- tree.get_root_id()
932- # Find text entries that need to be copied
933- for old_path, new_path, file_id, entry in delta:
934- if new_path is not None:
935- if not new_path:
936- # This is the root
937- if not self.target.supports_rich_root():
938- # The target doesn't support rich root, so we don't
939- # copy
940- continue
941- if self._converting_to_rich_root:
942- # This can't be copied normally, we have to insert
943- # it specially
944- root_keys_to_create.add((file_id, entry.revision))
945- continue
946- text_keys.add((file_id, entry.revision))
947- revision = self.source.get_revision(current_revision_id)
948- pending_deltas.append((basis_id, delta,
949- current_revision_id, revision.parent_ids))
950- pending_revisions.append(revision)
951- cache[current_revision_id] = tree
952- basis_id = current_revision_id
953- # Copy file texts
954- from_texts = self.source.texts
955- to_texts = self.target.texts
956- if root_keys_to_create:
957- root_stream = self._new_root_data_stream(root_keys_to_create,
958- parent_map)
959- to_texts.insert_record_stream(root_stream)
960- to_texts.insert_record_stream(from_texts.get_record_stream(
961- text_keys, self.target._format._fetch_order,
962- not self.target._format._fetch_uses_deltas))
963- # insert inventory deltas
964- for delta in pending_deltas:
965- self.target.add_inventory_by_delta(*delta)
966- if self.target._fallback_repositories:
967- # Make sure this stacked repository has all the parent inventories
968- # for the new revisions that we are about to insert. We do this
969- # before adding the revisions so that no revision is added until
970- # all the inventories it may depend on are added.
971- parent_ids = set()
972- revision_ids = set()
973- for revision in pending_revisions:
974- revision_ids.add(revision.revision_id)
975- parent_ids.update(revision.parent_ids)
976- parent_ids.difference_update(revision_ids)
977- parent_ids.discard(_mod_revision.NULL_REVISION)
978- parent_map = self.source.get_parent_map(parent_ids)
979- for parent_tree in self.source.revision_trees(parent_ids):
980- basis_id, delta = self._get_delta_for_revision(tree, parent_ids, basis_id, cache)
981- current_revision_id = parent_tree.get_revision_id()
982- parents_parents = parent_map[current_revision_id]
983- self.target.add_inventory_by_delta(
984- basis_id, delta, current_revision_id, parents_parents)
985- # insert signatures and revisions
986- for revision in pending_revisions:
987- try:
988- signature = self.source.get_signature_text(
989- revision.revision_id)
990- self.target.add_signature_text(revision.revision_id,
991- signature)
992- except errors.NoSuchRevision:
993- pass
994- self.target.add_revision(revision.revision_id, revision)
995- return basis_id
996-
997- def _fetch_all_revisions(self, revision_ids, pb):
998- """Fetch everything for the list of revisions.
999-
1000- :param revision_ids: The list of revisions to fetch. Must be in
1001- topological order.
1002- :param pb: A ProgressBar
1003- :return: None
1004- """
1005- basis_id, basis_tree = self._get_basis(revision_ids[0])
1006- batch_size = 100
1007- cache = lru_cache.LRUCache(100)
1008- cache[basis_id] = basis_tree
1009- del basis_tree # We don't want to hang on to it here
1010- hints = []
1011- for offset in range(0, len(revision_ids), batch_size):
1012- self.target.start_write_group()
1013- try:
1014- pb.update('Transferring revisions', offset,
1015- len(revision_ids))
1016- batch = revision_ids[offset:offset+batch_size]
1017- basis_id = self._fetch_batch(batch, basis_id, cache)
1018- except:
1019- self.target.abort_write_group()
1020- raise
1021- else:
1022- hint = self.target.commit_write_group()
1023- if hint:
1024- hints.extend(hint)
1025- if hints and self.target._format.pack_compresses:
1026- self.target.pack(hint=hints)
1027- pb.update('Transferring revisions', len(revision_ids),
1028- len(revision_ids))
1029-
1030- @needs_write_lock
1031- def fetch(self, revision_id=None, pb=None, find_ghosts=False,
1032- fetch_spec=None):
1033- """See InterRepository.fetch()."""
1034- if fetch_spec is not None:
1035- raise AssertionError("Not implemented yet...")
1036- if (not self.source.supports_rich_root()
1037- and self.target.supports_rich_root()):
1038- self._converting_to_rich_root = True
1039- self._revision_id_to_root_id = {}
1040- else:
1041- self._converting_to_rich_root = False
1042- revision_ids = self.target.search_missing_revision_ids(self.source,
1043- revision_id, find_ghosts=find_ghosts).get_keys()
1044- if not revision_ids:
1045- return 0, 0
1046- revision_ids = tsort.topo_sort(
1047- self.source.get_graph().get_parent_map(revision_ids))
1048- if not revision_ids:
1049- return 0, 0
1050- # Walk though all revisions; get inventory deltas, copy referenced
1051- # texts that delta references, insert the delta, revision and
1052- # signature.
1053- first_rev = self.source.get_revision(revision_ids[0])
1054- if pb is None:
1055- my_pb = ui.ui_factory.nested_progress_bar()
1056- pb = my_pb
1057- else:
1058- symbol_versioning.warn(
1059- symbol_versioning.deprecated_in((1, 14, 0))
1060- % "pb parameter to fetch()")
1061- my_pb = None
1062- try:
1063- self._fetch_all_revisions(revision_ids, pb)
1064- finally:
1065- if my_pb is not None:
1066- my_pb.finished()
1067- return len(revision_ids), 0
1068-
1069- def _get_basis(self, first_revision_id):
1070- """Get a revision and tree which exists in the target.
1071-
1072- This assumes that first_revision_id is selected for transmission
1073- because all other ancestors are already present. If we can't find an
1074- ancestor we fall back to NULL_REVISION since we know that is safe.
1075-
1076- :return: (basis_id, basis_tree)
1077- """
1078- first_rev = self.source.get_revision(first_revision_id)
1079- try:
1080- basis_id = first_rev.parent_ids[0]
1081- # only valid as a basis if the target has it
1082- self.target.get_revision(basis_id)
1083- # Try to get a basis tree - if its a ghost it will hit the
1084- # NoSuchRevision case.
1085- basis_tree = self.source.revision_tree(basis_id)
1086- except (IndexError, errors.NoSuchRevision):
1087- basis_id = _mod_revision.NULL_REVISION
1088- basis_tree = self.source.revision_tree(basis_id)
1089- return basis_id, basis_tree
1090-
1091-
1092-InterRepository.register_optimiser(InterDifferingSerializer)
1093 InterRepository.register_optimiser(InterSameDataRepository)
1094 InterRepository.register_optimiser(InterWeaveRepo)
1095 InterRepository.register_optimiser(InterKnitRepo)
1096@@ -3882,9 +3608,6 @@
1097 self.file_ids = set([file_id for file_id, _ in
1098 self.text_index.iterkeys()])
1099 # text keys is now grouped by file_id
1100- n_weaves = len(self.file_ids)
1101- files_in_revisions = {}
1102- revisions_of_files = {}
1103 n_versions = len(self.text_index)
1104 progress_bar.update('loading text store', 0, n_versions)
1105 parent_map = self.repository.texts.get_parent_map(self.text_index)
1106@@ -3983,6 +3706,7 @@
1107 pass
1108 else:
1109 new_pack.set_write_cache_size(1024*1024)
1110+ delta_deserializer = inventory_delta.InventoryDeltaSerializer()
1111 for substream_type, substream in stream:
1112 if substream_type == 'texts':
1113 self.target_repo.texts.insert_record_stream(substream)
1114@@ -3992,7 +3716,8 @@
1115 substream)
1116 else:
1117 self._extract_and_insert_inventories(
1118- substream, src_serializer)
1119+ substream, src_serializer,
1120+ delta_deserializer.parse_text_bytes)
1121 elif substream_type == 'chk_bytes':
1122 # XXX: This doesn't support conversions, as it assumes the
1123 # conversion was done in the fetch code.
1124@@ -4049,18 +3774,40 @@
1125 self.target_repo.pack(hint=hint)
1126 return [], set()
1127
1128- def _extract_and_insert_inventories(self, substream, serializer):
1129+ def _extract_and_insert_inventories(self, substream, serializer,
1130+ parse_delta=None):
1131 """Generate a new inventory versionedfile in target, converting data.
1132
1133 The inventory is retrieved from the source, (deserializing it), and
1134 stored in the target (reserializing it in a different format).
1135 """
1136+ target_rich_root = self.target_repo._format.rich_root_data
1137+ target_tree_refs = self.target_repo._format.supports_tree_reference
1138 for record in substream:
1139+ if record.storage_kind == 'inventory-delta':
1140+ # Insert the delta directly
1141+ delta_tuple = record.get_bytes_as('inventory-delta')
1142+ basis_id, new_id, inv_delta, format_flags = delta_tuple
1143+ # Make sure the delta is compatible with the target
1144+ if format_flags[0] and not target_rich_root:
1145+ raise errors.IncompatibleRevision(self.target_repo._format)
1146+ if format_flags[1] and not target_tree_refs:
1147+ raise errors.IncompatibleRevision(self.target_repo._format)
1148+ revision_id = new_id[0]
1149+ parents = [key[0] for key in record.parents]
1150+ self.target_repo.add_inventory_by_delta(
1151+ basis_id, inv_delta, revision_id, parents)
1152+ continue
1153+ # It's not a delta, so it must be a fulltext in the source
1154+ # serializer's format.
1155 bytes = record.get_bytes_as('fulltext')
1156 revision_id = record.key[0]
1157 inv = serializer.read_inventory_from_string(bytes, revision_id)
1158 parents = [key[0] for key in record.parents]
1159 self.target_repo.add_inventory(revision_id, inv, parents)
1160+ # No need to keep holding this full inv in memory when the rest of
1161+ # the substream is likely to be all deltas.
1162+ del inv
1163
1164 def _extract_and_insert_revisions(self, substream, serializer):
1165 for record in substream:
1166@@ -4115,11 +3862,8 @@
1167 return [('signatures', signatures), ('revisions', revisions)]
1168
1169 def _generate_root_texts(self, revs):
1170- """This will be called by __fetch between fetching weave texts and
1171+ """This will be called by get_stream between fetching weave texts and
1172 fetching the inventory weave.
1173-
1174- Subclasses should override this if they need to generate root texts
1175- after fetching weave texts.
1176 """
1177 if self._rich_root_upgrade():
1178 import bzrlib.fetch
1179@@ -4157,9 +3901,6 @@
1180 # will be valid.
1181 for _ in self._generate_root_texts(revs):
1182 yield _
1183- # NB: This currently reopens the inventory weave in source;
1184- # using a single stream interface instead would avoid this.
1185- from_weave = self.from_repository.inventories
1186 # we fetch only the referenced inventories because we do not
1187 # know for unselected inventories whether all their required
1188 # texts are present in the other repository - it could be
1189@@ -4204,6 +3945,22 @@
1190 if not keys:
1191 # No need to stream something we don't have
1192 continue
1193+ if substream_kind == 'inventories':
1194+ # Some missing keys are genuinely ghosts, filter those out.
1195+ present = self.from_repository.inventories.get_parent_map(keys)
1196+ revs = [key[0] for key in present]
1197+ # As with the original stream, we may need to generate root
1198+ # texts for the inventories we're about to stream.
1199+ for _ in self._generate_root_texts(revs):
1200+ yield _
1201+ # Get the inventory stream more-or-less as we do for the
1202+ # original stream; there's no reason to assume that records
1203+ # direct from the source will be suitable for the sink. (Think
1204+ # e.g. 2a -> 1.9-rich-root).
1205+ for info in self._get_inventory_stream(revs, missing=True):
1206+ yield info
1207+ continue
1208+
1209 # Ask for full texts always so that we don't need more round trips
1210 # after this stream.
1211 # Some of the missing keys are genuinely ghosts, so filter absent
1212@@ -4224,129 +3981,95 @@
1213 return (not self.from_repository._format.rich_root_data and
1214 self.to_format.rich_root_data)
1215
1216- def _get_inventory_stream(self, revision_ids):
1217+ def _get_inventory_stream(self, revision_ids, missing=False):
1218 from_format = self.from_repository._format
1219- if (from_format.supports_chks and self.to_format.supports_chks
1220- and (from_format._serializer == self.to_format._serializer)):
1221- # Both sides support chks, and they use the same serializer, so it
1222- # is safe to transmit the chk pages and inventory pages across
1223- # as-is.
1224- return self._get_chk_inventory_stream(revision_ids)
1225+ if (from_format.supports_chks and self.to_format.supports_chks and
1226+ from_format.network_name() == self.to_format.network_name()):
1227+ raise AssertionError(
1228+ "this case should be handled by GroupCHKStreamSource")
1229 elif (not from_format.supports_chks):
1230 # Source repository doesn't support chks. So we can transmit the
1231 # inventories 'as-is' and either they are just accepted on the
1232 # target, or the Sink will properly convert it.
1233- return self._get_simple_inventory_stream(revision_ids)
1234+ # (XXX: this assumes that all non-chk formats are understood as-is
1235+ # by any Sink, but that presumably isn't true for foreign repo
1236+ # formats added by bzr-svn etc?)
1237+ return self._get_simple_inventory_stream(revision_ids,
1238+ missing=missing)
1239 else:
1240- # XXX: Hack to make not-chk->chk fetch: copy the inventories as
1241- # inventories. Note that this should probably be done somehow
1242- # as part of bzrlib.repository.StreamSink. Except JAM couldn't
1243- # figure out how a non-chk repository could possibly handle
1244- # deserializing an inventory stream from a chk repo, as it
1245- # doesn't have a way to understand individual pages.
1246- return self._get_convertable_inventory_stream(revision_ids)
1247+ # Make chk->non-chk (and chk with different serializers) fetch:
1248+ # copy the inventories as (format-neutral) inventory deltas.
1249+ return self._get_convertable_inventory_stream(revision_ids,
1250+ fulltexts=missing)
1251
1252- def _get_simple_inventory_stream(self, revision_ids):
1253+ def _get_simple_inventory_stream(self, revision_ids, missing=False):
1254+ # NB: This currently reopens the inventory weave in source;
1255+ # using a single stream interface instead would avoid this.
1256 from_weave = self.from_repository.inventories
1257+ if missing:
1258+ delta_closure = True
1259+ else:
1260+ delta_closure = not self.delta_on_metadata()
1261 yield ('inventories', from_weave.get_record_stream(
1262 [(rev_id,) for rev_id in revision_ids],
1263- self.inventory_fetch_order(),
1264- not self.delta_on_metadata()))
1265-
1266- def _get_chk_inventory_stream(self, revision_ids):
1267- """Fetch the inventory texts, along with the associated chk maps."""
1268- # We want an inventory outside of the search set, so that we can filter
1269- # out uninteresting chk pages. For now we use
1270- # _find_revision_outside_set, but if we had a Search with cut_revs, we
1271- # could use that instead.
1272- start_rev_id = self.from_repository._find_revision_outside_set(
1273- revision_ids)
1274- start_rev_key = (start_rev_id,)
1275- inv_keys_to_fetch = [(rev_id,) for rev_id in revision_ids]
1276- if start_rev_id != _mod_revision.NULL_REVISION:
1277- inv_keys_to_fetch.append((start_rev_id,))
1278- # Any repo that supports chk_bytes must also support out-of-order
1279- # insertion. At least, that is how we expect it to work
1280- # We use get_record_stream instead of iter_inventories because we want
1281- # to be able to insert the stream as well. We could instead fetch
1282- # allowing deltas, and then iter_inventories, but we don't know whether
1283- # source or target is more 'local' anway.
1284- inv_stream = self.from_repository.inventories.get_record_stream(
1285- inv_keys_to_fetch, 'unordered',
1286- True) # We need them as full-texts so we can find their references
1287- uninteresting_chk_roots = set()
1288- interesting_chk_roots = set()
1289- def filter_inv_stream(inv_stream):
1290- for idx, record in enumerate(inv_stream):
1291- ### child_pb.update('fetch inv', idx, len(inv_keys_to_fetch))
1292- bytes = record.get_bytes_as('fulltext')
1293- chk_inv = inventory.CHKInventory.deserialise(
1294- self.from_repository.chk_bytes, bytes, record.key)
1295- if record.key == start_rev_key:
1296- uninteresting_chk_roots.add(chk_inv.id_to_entry.key())
1297- p_id_map = chk_inv.parent_id_basename_to_file_id
1298- if p_id_map is not None:
1299- uninteresting_chk_roots.add(p_id_map.key())
1300- else:
1301- yield record
1302- interesting_chk_roots.add(chk_inv.id_to_entry.key())
1303- p_id_map = chk_inv.parent_id_basename_to_file_id
1304- if p_id_map is not None:
1305- interesting_chk_roots.add(p_id_map.key())
1306- ### pb.update('fetch inventory', 0, 2)
1307- yield ('inventories', filter_inv_stream(inv_stream))
1308- # Now that we have worked out all of the interesting root nodes, grab
1309- # all of the interesting pages and insert them
1310- ### pb.update('fetch inventory', 1, 2)
1311- interesting = chk_map.iter_interesting_nodes(
1312- self.from_repository.chk_bytes, interesting_chk_roots,
1313- uninteresting_chk_roots)
1314- def to_stream_adapter():
1315- """Adapt the iter_interesting_nodes result to a single stream.
1316-
1317- iter_interesting_nodes returns records as it processes them, along
1318- with keys. However, we only want to return the records themselves.
1319- """
1320- for record, items in interesting:
1321- if record is not None:
1322- yield record
1323- # XXX: We could instead call get_record_stream(records.keys())
1324- # ATM, this will always insert the records as fulltexts, and
1325- # requires that you can hang on to records once you have gone
1326- # on to the next one. Further, it causes the target to
1327- # recompress the data. Testing shows it to be faster than
1328- # requesting the records again, though.
1329- yield ('chk_bytes', to_stream_adapter())
1330- ### pb.update('fetch inventory', 2, 2)
1331-
1332- def _get_convertable_inventory_stream(self, revision_ids):
1333- # XXX: One of source or target is using chks, and they don't have
1334- # compatible serializations. The StreamSink code expects to be
1335- # able to convert on the target, so we need to put
1336- # bytes-on-the-wire that can be converted
1337- yield ('inventories', self._stream_invs_as_fulltexts(revision_ids))
1338-
1339- def _stream_invs_as_fulltexts(self, revision_ids):
1340+ self.inventory_fetch_order(), delta_closure))
1341+
1342+ def _get_convertable_inventory_stream(self, revision_ids, fulltexts=False):
1343+ # The source is using CHKs, but the target either doesn't or is has a
1344+ # different serializer. The StreamSink code expects to be able to
1345+ # convert on the target, so we need to put bytes-on-the-wire that can
1346+ # be converted. That means inventory deltas (if the remote is <1.18,
1347+ # RemoteStreamSink will fallback to VFS to insert the deltas).
1348+ yield ('inventories',
1349+ self._stream_invs_as_deltas(revision_ids, fulltexts=fulltexts))
1350+
1351+ def _stream_invs_as_deltas(self, revision_ids, fulltexts=False):
1352 from_repo = self.from_repository
1353- from_serializer = from_repo._format._serializer
1354 revision_keys = [(rev_id,) for rev_id in revision_ids]
1355 parent_map = from_repo.inventories.get_parent_map(revision_keys)
1356- for inv in self.from_repository.iter_inventories(revision_ids):
1357- # XXX: This is a bit hackish, but it works. Basically,
1358- # CHKSerializer 'accidentally' supports
1359- # read/write_inventory_to_string, even though that is never
1360- # the format that is stored on disk. It *does* give us a
1361- # single string representation for an inventory, so live with
1362- # it for now.
1363- # This would be far better if we had a 'serialized inventory
1364- # delta' form. Then we could use 'inventory._make_delta', and
1365- # transmit that. This would both be faster to generate, and
1366- # result in fewer bytes-on-the-wire.
1367- as_bytes = from_serializer.write_inventory_to_string(inv)
1368+ # XXX: possibly repos could implement a more efficient iter_inv_deltas
1369+ # method...
1370+ inventories = self.from_repository.iter_inventories(
1371+ revision_ids, 'topological')
1372+ # XXX: ideally these flags would be per-revision, not per-repo (e.g.
1373+ # streaming a non-rich-root revision out of a rich-root repo back into
1374+ # a non-rich-root repo ought to be allowed)
1375+ format = from_repo._format
1376+ flags = (format.rich_root_data, format.supports_tree_reference)
1377+ invs_sent_so_far = set([_mod_revision.NULL_REVISION])
1378+ for inv in inventories:
1379 key = (inv.revision_id,)
1380- parent_keys = parent_map.get(key, ())
1381- yield versionedfile.FulltextContentFactory(
1382- key, parent_keys, None, as_bytes)
1383+ parents = parent_map.get(key, ())
1384+ if fulltexts or parents == ():
1385+ # Either the caller asked for fulltexts, or there is no parent,
1386+ # so, stream as a delta from null:.
1387+ basis_id = _mod_revision.NULL_REVISION
1388+ parent_inv = Inventory(None)
1389+ delta = inv._make_delta(parent_inv)
1390+ else:
1391+ # Make a delta against each parent so that we can find the
1392+ # smallest.
1393+ best_delta = None
1394+ parent_ids = [parent_key[0] for parent_key in parents]
1395+ parent_ids.append(_mod_revision.NULL_REVISION)
1396+ for parent_id in parent_ids:
1397+ if parent_id not in invs_sent_so_far:
1398+ # We don't know that the remote side has this basis, so
1399+ # we can't use it.
1400+ continue
1401+ if parent_id == _mod_revision.NULL_REVISION:
1402+ parent_inv = Inventory(None)
1403+ else:
1404+ parent_inv = from_repo.get_inventory(parent_id)
1405+ candidate_delta = inv._make_delta(parent_inv)
1406+ if (best_delta is None or
1407+ len(best_delta) > len(candidate_delta)):
1408+ best_delta = candidate_delta
1409+ basis_id = parent_id
1410+ delta = best_delta
1411+ invs_sent_so_far.add(basis_id)
1412+ yield versionedfile.InventoryDeltaContentFactory(
1413+ key, parents, None, delta, basis_id, flags, from_repo)
1414
1415
1416 def _iter_for_revno(repo, partial_history_cache, stop_index=None,
1417
1418=== modified file 'bzrlib/smart/protocol.py'
1419--- bzrlib/smart/protocol.py 2009-07-17 01:48:56 +0000
1420+++ bzrlib/smart/protocol.py 2009-07-22 00:35:23 +0000
1421@@ -1209,6 +1209,8 @@
1422 except (KeyboardInterrupt, SystemExit):
1423 raise
1424 except Exception:
1425+ mutter('_iter_with_errors caught error')
1426+ log_exception_quietly()
1427 yield sys.exc_info(), None
1428 return
1429
1430
1431=== modified file 'bzrlib/smart/repository.py'
1432--- bzrlib/smart/repository.py 2009-06-16 06:46:32 +0000
1433+++ bzrlib/smart/repository.py 2009-07-22 00:35:23 +0000
1434@@ -30,6 +30,7 @@
1435 graph,
1436 osutils,
1437 pack,
1438+ versionedfile,
1439 )
1440 from bzrlib.bzrdir import BzrDir
1441 from bzrlib.smart.request import (
1442@@ -39,7 +40,11 @@
1443 )
1444 from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
1445 from bzrlib import revision as _mod_revision
1446-from bzrlib.versionedfile import NetworkRecordStream, record_to_fulltext_bytes
1447+from bzrlib.versionedfile import (
1448+ NetworkRecordStream,
1449+ record_to_fulltext_bytes,
1450+ record_to_inventory_delta_bytes,
1451+ )
1452
1453
1454 class SmartServerRepositoryRequest(SmartServerRequest):
1455@@ -414,8 +419,39 @@
1456 repository.
1457 """
1458 self._to_format = network_format_registry.get(to_network_name)
1459+ if self._should_fake_unknown():
1460+ return FailedSmartServerResponse(
1461+ ('UnknownMethod', 'Repository.get_stream'))
1462 return None # Signal that we want a body.
1463
1464+ def _should_fake_unknown(self):
1465+ # This is a workaround for bugs in pre-1.18 clients that claim to
1466+ # support receiving streams of CHK repositories. The pre-1.18 client
1467+ # expects inventory records to be serialized in the format defined by
1468+ # to_network_name, but in pre-1.18 (at least) that format definition
1469+ # tries to use the xml5 serializer, which does not correctly handle
1470+ # rich-roots. After 1.18 the client can also accept inventory-deltas
1471+ # (which avoids this issue), and those clients will use the
1472+ # Repository.get_stream_1.18 verb instead of this one.
1473+ # So: if this repository is CHK, and the to_format doesn't match,
1474+ # we should just fake an UnknownSmartMethod error so that the client
1475+ # will fallback to VFS, rather than sending it a stream we know it
1476+ # cannot handle.
1477+ from_format = self._repository._format
1478+ to_format = self._to_format
1479+ if not from_format.supports_chks:
1480+ # Source not CHK: that's ok
1481+ return False
1482+ if (to_format.supports_chks and
1483+ from_format.repository_class is to_format.repository_class and
1484+ from_format._serializer == to_format._serializer):
1485+ # Source is CHK, but target matches: that's ok
1486+ # (e.g. 2a->2a, or CHK2->2a)
1487+ return False
1488+ # Source is CHK, and target is not CHK or incompatible CHK. We can't
1489+ # generate a compatible stream.
1490+ return True
1491+
1492 def do_body(self, body_bytes):
1493 repository = self._repository
1494 repository.lock_read()
1495@@ -451,6 +487,14 @@
1496 repository.unlock()
1497
1498
1499+class SmartServerRepositoryGetStream_1_18(SmartServerRepositoryGetStream):
1500+
1501+ def _should_fake_unknown(self):
1502+ # The client is at least 1.18, so we don't need to work around any
1503+ # bugs.
1504+ return False
1505+
1506+
1507 def _stream_to_byte_stream(stream, src_format):
1508 """Convert a record stream to a self delimited byte stream."""
1509 pack_writer = pack.ContainerSerialiser()
1510@@ -460,6 +504,8 @@
1511 for record in substream:
1512 if record.storage_kind in ('chunked', 'fulltext'):
1513 serialised = record_to_fulltext_bytes(record)
1514+ elif record.storage_kind == 'inventory-delta':
1515+ serialised = record_to_inventory_delta_bytes(record)
1516 elif record.storage_kind == 'absent':
1517 raise ValueError("Absent factory for %s" % (record.key,))
1518 else:
1519@@ -650,6 +696,23 @@
1520 return SuccessfulSmartServerResponse(('ok', ))
1521
1522
1523+class SmartServerRepositoryInsertStream_1_18(SmartServerRepositoryInsertStreamLocked):
1524+ """Insert a record stream from a RemoteSink into a repository.
1525+
1526+ Same as SmartServerRepositoryInsertStreamLocked, except:
1527+ - the lock token argument is optional
1528+ - servers that implement this verb accept 'inventory-delta' records in the
1529+ stream.
1530+
1531+ New in 1.18.
1532+ """
1533+
1534+ def do_repository_request(self, repository, resume_tokens, lock_token=None):
1535+ """StreamSink.insert_stream for a remote repository."""
1536+ SmartServerRepositoryInsertStreamLocked.do_repository_request(
1537+ self, repository, resume_tokens, lock_token)
1538+
1539+
1540 class SmartServerRepositoryInsertStream(SmartServerRepositoryInsertStreamLocked):
1541 """Insert a record stream from a RemoteSink into an unlocked repository.
1542
1543
1544=== modified file 'bzrlib/smart/request.py'
1545--- bzrlib/smart/request.py 2009-07-17 01:47:01 +0000
1546+++ bzrlib/smart/request.py 2009-07-22 00:35:23 +0000
1547@@ -550,6 +550,8 @@
1548 request_handlers.register_lazy(
1549 'Repository.insert_stream', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStream')
1550 request_handlers.register_lazy(
1551+ 'Repository.insert_stream_1.18', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStream_1_18')
1552+request_handlers.register_lazy(
1553 'Repository.insert_stream_locked', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStreamLocked')
1554 request_handlers.register_lazy(
1555 'Repository.is_shared', 'bzrlib.smart.repository', 'SmartServerRepositoryIsShared')
1556@@ -567,6 +569,9 @@
1557 'Repository.get_stream', 'bzrlib.smart.repository',
1558 'SmartServerRepositoryGetStream')
1559 request_handlers.register_lazy(
1560+ 'Repository.get_stream_1.18', 'bzrlib.smart.repository',
1561+ 'SmartServerRepositoryGetStream_1_18')
1562+request_handlers.register_lazy(
1563 'Repository.tarball', 'bzrlib.smart.repository',
1564 'SmartServerRepositoryTarball')
1565 request_handlers.register_lazy(
1566
1567=== modified file 'bzrlib/tests/__init__.py'
1568--- bzrlib/tests/__init__.py 2009-07-20 04:22:47 +0000
1569+++ bzrlib/tests/__init__.py 2009-07-22 00:35:23 +0000
1570@@ -1912,6 +1912,16 @@
1571 sio.encoding = output_encoding
1572 return sio
1573
1574+ def disable_verb(self, verb):
1575+ """Disable a smart server verb for one test."""
1576+ from bzrlib.smart import request
1577+ request_handlers = request.request_handlers
1578+ orig_method = request_handlers.get(verb)
1579+ request_handlers.remove(verb)
1580+ def restoreVerb():
1581+ request_handlers.register(verb, orig_method)
1582+ self.addCleanup(restoreVerb)
1583+
1584
1585 class CapturedCall(object):
1586 """A helper for capturing smart server calls for easy debug analysis."""
1587
1588=== modified file 'bzrlib/tests/per_branch/test_push.py'
1589--- bzrlib/tests/per_branch/test_push.py 2009-07-10 05:49:34 +0000
1590+++ bzrlib/tests/per_branch/test_push.py 2009-07-22 00:35:23 +0000
1591@@ -260,14 +260,15 @@
1592 self.assertFalse(local.is_locked())
1593 local.push(remote)
1594 hpss_call_names = [item.call.method for item in self.hpss_calls]
1595- self.assertTrue('Repository.insert_stream' in hpss_call_names)
1596- insert_stream_idx = hpss_call_names.index('Repository.insert_stream')
1597+ self.assertTrue('Repository.insert_stream_1.18' in hpss_call_names)
1598+ insert_stream_idx = hpss_call_names.index(
1599+ 'Repository.insert_stream_1.18')
1600 calls_after_insert_stream = hpss_call_names[insert_stream_idx:]
1601 # After inserting the stream the client has no reason to query the
1602 # remote graph any further.
1603 self.assertEqual(
1604- ['Repository.insert_stream', 'Repository.insert_stream', 'get',
1605- 'Branch.set_last_revision_info', 'Branch.unlock'],
1606+ ['Repository.insert_stream_1.18', 'Repository.insert_stream_1.18',
1607+ 'get', 'Branch.set_last_revision_info', 'Branch.unlock'],
1608 calls_after_insert_stream)
1609
1610 def disableOptimisticGetParentMap(self):
1611
1612=== modified file 'bzrlib/tests/per_interbranch/test_push.py'
1613--- bzrlib/tests/per_interbranch/test_push.py 2009-07-10 05:49:34 +0000
1614+++ bzrlib/tests/per_interbranch/test_push.py 2009-07-22 00:35:23 +0000
1615@@ -266,14 +266,15 @@
1616 self.assertFalse(local.is_locked())
1617 local.push(remote)
1618 hpss_call_names = [item.call.method for item in self.hpss_calls]
1619- self.assertTrue('Repository.insert_stream' in hpss_call_names)
1620- insert_stream_idx = hpss_call_names.index('Repository.insert_stream')
1621+ self.assertTrue('Repository.insert_stream_1.18' in hpss_call_names)
1622+ insert_stream_idx = hpss_call_names.index(
1623+ 'Repository.insert_stream_1.18')
1624 calls_after_insert_stream = hpss_call_names[insert_stream_idx:]
1625 # After inserting the stream the client has no reason to query the
1626 # remote graph any further.
1627 self.assertEqual(
1628- ['Repository.insert_stream', 'Repository.insert_stream', 'get',
1629- 'Branch.set_last_revision_info', 'Branch.unlock'],
1630+ ['Repository.insert_stream_1.18', 'Repository.insert_stream_1.18',
1631+ 'get', 'Branch.set_last_revision_info', 'Branch.unlock'],
1632 calls_after_insert_stream)
1633
1634 def disableOptimisticGetParentMap(self):
1635
1636=== modified file 'bzrlib/tests/per_interrepository/__init__.py'
1637--- bzrlib/tests/per_interrepository/__init__.py 2009-07-10 06:46:10 +0000
1638+++ bzrlib/tests/per_interrepository/__init__.py 2009-07-22 00:35:23 +0000
1639@@ -32,8 +32,6 @@
1640 )
1641
1642 from bzrlib.repository import (
1643- InterDifferingSerializer,
1644- InterKnitRepo,
1645 InterRepository,
1646 )
1647 from bzrlib.tests import (
1648@@ -51,15 +49,13 @@
1649 (interrepo_class, repository_format, repository_format_to).
1650 """
1651 result = []
1652- for interrepo_class, repository_format, repository_format_to in formats:
1653- id = '%s,%s,%s' % (interrepo_class.__name__,
1654- repository_format.__class__.__name__,
1655- repository_format_to.__class__.__name__)
1656+ for repository_format, repository_format_to in formats:
1657+ id = '%s,%s' % (repository_format.__class__.__name__,
1658+ repository_format_to.__class__.__name__)
1659 scenario = (id,
1660 {"transport_server": transport_server,
1661 "transport_readonly_server": transport_readonly_server,
1662 "repository_format": repository_format,
1663- "interrepo_class": interrepo_class,
1664 "repository_format_to": repository_format_to,
1665 })
1666 result.append(scenario)
1667@@ -68,8 +64,12 @@
1668
1669 def default_test_list():
1670 """Generate the default list of interrepo permutations to test."""
1671- from bzrlib.repofmt import knitrepo, pack_repo, weaverepo
1672+ from bzrlib.repofmt import (
1673+ knitrepo, pack_repo, weaverepo, groupcompress_repo,
1674+ )
1675 result = []
1676+ def add_combo(from_format, to_format):
1677+ result.append((from_format, to_format))
1678 # test the default InterRepository between format 6 and the current
1679 # default format.
1680 # XXX: robertc 20060220 reinstate this when there are two supported
1681@@ -80,37 +80,33 @@
1682 for optimiser_class in InterRepository._optimisers:
1683 format_to_test = optimiser_class._get_repo_format_to_test()
1684 if format_to_test is not None:
1685- result.append((optimiser_class,
1686- format_to_test, format_to_test))
1687+ add_combo(format_to_test, format_to_test)
1688 # if there are specific combinations we want to use, we can add them
1689 # here. We want to test rich root upgrading.
1690- result.append((InterRepository,
1691- weaverepo.RepositoryFormat5(),
1692- knitrepo.RepositoryFormatKnit3()))
1693- result.append((InterRepository,
1694- knitrepo.RepositoryFormatKnit1(),
1695- knitrepo.RepositoryFormatKnit3()))
1696- result.append((InterRepository,
1697- knitrepo.RepositoryFormatKnit1(),
1698- knitrepo.RepositoryFormatKnit3()))
1699- result.append((InterKnitRepo,
1700- knitrepo.RepositoryFormatKnit1(),
1701- pack_repo.RepositoryFormatKnitPack1()))
1702- result.append((InterKnitRepo,
1703- pack_repo.RepositoryFormatKnitPack1(),
1704- knitrepo.RepositoryFormatKnit1()))
1705- result.append((InterKnitRepo,
1706- knitrepo.RepositoryFormatKnit3(),
1707- pack_repo.RepositoryFormatKnitPack3()))
1708- result.append((InterKnitRepo,
1709- pack_repo.RepositoryFormatKnitPack3(),
1710- knitrepo.RepositoryFormatKnit3()))
1711- result.append((InterKnitRepo,
1712- pack_repo.RepositoryFormatKnitPack3(),
1713- pack_repo.RepositoryFormatKnitPack4()))
1714- result.append((InterDifferingSerializer,
1715- pack_repo.RepositoryFormatKnitPack1(),
1716- pack_repo.RepositoryFormatKnitPack6RichRoot()))
1717+ add_combo(weaverepo.RepositoryFormat5(),
1718+ knitrepo.RepositoryFormatKnit3())
1719+ add_combo(knitrepo.RepositoryFormatKnit1(),
1720+ knitrepo.RepositoryFormatKnit3())
1721+ add_combo(knitrepo.RepositoryFormatKnit1(),
1722+ pack_repo.RepositoryFormatKnitPack1())
1723+ add_combo(pack_repo.RepositoryFormatKnitPack1(),
1724+ knitrepo.RepositoryFormatKnit1())
1725+ add_combo(knitrepo.RepositoryFormatKnit3(),
1726+ pack_repo.RepositoryFormatKnitPack3())
1727+ add_combo(pack_repo.RepositoryFormatKnitPack3(),
1728+ knitrepo.RepositoryFormatKnit3())
1729+ add_combo(pack_repo.RepositoryFormatKnitPack3(),
1730+ pack_repo.RepositoryFormatKnitPack4())
1731+ add_combo(pack_repo.RepositoryFormatKnitPack1(),
1732+ pack_repo.RepositoryFormatKnitPack6RichRoot())
1733+ add_combo(pack_repo.RepositoryFormatKnitPack6RichRoot(),
1734+ groupcompress_repo.RepositoryFormat2a())
1735+ add_combo(groupcompress_repo.RepositoryFormat2a(),
1736+ pack_repo.RepositoryFormatKnitPack6RichRoot())
1737+ add_combo(groupcompress_repo.RepositoryFormatCHK2(),
1738+ groupcompress_repo.RepositoryFormat2a())
1739+ add_combo(groupcompress_repo.RepositoryFormatCHK1(),
1740+ groupcompress_repo.RepositoryFormat2a())
1741 return result
1742
1743
1744
1745=== modified file 'bzrlib/tests/per_interrepository/test_fetch.py'
1746--- bzrlib/tests/per_interrepository/test_fetch.py 2009-07-10 06:46:10 +0000
1747+++ bzrlib/tests/per_interrepository/test_fetch.py 2009-07-22 00:35:23 +0000
1748@@ -28,6 +28,9 @@
1749 from bzrlib.errors import (
1750 NoSuchRevision,
1751 )
1752+from bzrlib.graph import (
1753+ SearchResult,
1754+ )
1755 from bzrlib.revision import (
1756 NULL_REVISION,
1757 Revision,
1758@@ -124,6 +127,15 @@
1759 to_repo.texts.get_record_stream([('foo', revid)],
1760 'unordered', True).next().get_bytes_as('fulltext'))
1761
1762+ def test_fetch_parent_inventories_at_stacking_boundary_smart(self):
1763+ self.setup_smart_server_with_call_log()
1764+ self.test_fetch_parent_inventories_at_stacking_boundary()
1765+
1766+ def test_fetch_parent_inventories_at_stacking_boundary_smart_old(self):
1767+ self.setup_smart_server_with_call_log()
1768+ self.disable_verb('Repository.insert_stream_1.18')
1769+ self.test_fetch_parent_inventories_at_stacking_boundary()
1770+
1771 def test_fetch_parent_inventories_at_stacking_boundary(self):
1772 """Fetch to a stacked branch copies inventories for parents of
1773 revisions at the stacking boundary.
1774@@ -156,11 +168,25 @@
1775 unstacked_repo = stacked_branch.bzrdir.open_repository()
1776 unstacked_repo.lock_read()
1777 self.addCleanup(unstacked_repo.unlock)
1778- self.assertFalse(unstacked_repo.has_revision('left'))
1779- self.assertFalse(unstacked_repo.has_revision('right'))
1780- self.assertEqual(
1781- set([('left',), ('right',), ('merge',)]),
1782- unstacked_repo.inventories.keys())
1783+ if not unstacked_repo._format.supports_chks:
1784+ # these assertions aren't valid for groupcompress repos, which may
1785+ # transfer data than strictly necessary to avoid breaking up an
1786+ # already-compressed block of data.
1787+ self.assertFalse(unstacked_repo.has_revision('left'))
1788+ self.assertFalse(unstacked_repo.has_revision('right'))
1789+ self.assertTrue(unstacked_repo.has_revision('merge'))
1790+ # We used to check for the presence of parent invs here, but what
1791+ # really matters is that the repo can stream the new revision without
1792+ # the help of any fallback repos.
1793+ self.assertCanStreamRevision(unstacked_repo, 'merge')
1794+
1795+ def assertCanStreamRevision(self, repo, revision_id):
1796+ exclude_keys = set(repo.all_revision_ids()) - set([revision_id])
1797+ search = SearchResult([revision_id], exclude_keys, 1, [revision_id])
1798+ source = repo._get_source(repo._format)
1799+ for substream_kind, substream in source.get_stream(search):
1800+ # Consume the substream
1801+ list(substream)
1802
1803 def test_fetch_missing_basis_text(self):
1804 """If fetching a delta, we should die if a basis is not present."""
1805@@ -276,7 +302,10 @@
1806 to_repo = self.make_to_repository('to')
1807 to_repo.fetch(from_tree.branch.repository)
1808 recorded_inv_sha1 = to_repo.get_inventory_sha1('foo-id')
1809- xml = to_repo.get_inventory_xml('foo-id')
1810+ try:
1811+ xml = to_repo.get_inventory_xml('foo-id')
1812+ except NotImplementedError:
1813+ raise TestNotApplicable('repo does not provide get_inventory_xml')
1814 computed_inv_sha1 = osutils.sha_string(xml)
1815 self.assertEqual(computed_inv_sha1, recorded_inv_sha1)
1816
1817
1818=== modified file 'bzrlib/tests/test_inventory_delta.py'
1819--- bzrlib/tests/test_inventory_delta.py 2009-04-02 05:53:12 +0000
1820+++ bzrlib/tests/test_inventory_delta.py 2009-07-22 00:35:23 +0000
1821@@ -93,30 +93,26 @@
1822 """Test InventoryDeltaSerializer.parse_text_bytes."""
1823
1824 def test_parse_no_bytes(self):
1825- serializer = inventory_delta.InventoryDeltaSerializer(
1826- versioned_root=True, tree_references=True)
1827+ serializer = inventory_delta.InventoryDeltaSerializer()
1828 err = self.assertRaises(
1829 errors.BzrError, serializer.parse_text_bytes, '')
1830- self.assertContainsRe(str(err), 'unknown format')
1831+ self.assertContainsRe(str(err), 'last line not empty')
1832
1833 def test_parse_bad_format(self):
1834- serializer = inventory_delta.InventoryDeltaSerializer(
1835- versioned_root=True, tree_references=True)
1836+ serializer = inventory_delta.InventoryDeltaSerializer()
1837 err = self.assertRaises(errors.BzrError,
1838 serializer.parse_text_bytes, 'format: foo\n')
1839 self.assertContainsRe(str(err), 'unknown format')
1840
1841 def test_parse_no_parent(self):
1842- serializer = inventory_delta.InventoryDeltaSerializer(
1843- versioned_root=True, tree_references=True)
1844+ serializer = inventory_delta.InventoryDeltaSerializer()
1845 err = self.assertRaises(errors.BzrError,
1846 serializer.parse_text_bytes,
1847 'format: bzr inventory delta v1 (bzr 1.14)\n')
1848 self.assertContainsRe(str(err), 'missing parent: marker')
1849
1850 def test_parse_no_version(self):
1851- serializer = inventory_delta.InventoryDeltaSerializer(
1852- versioned_root=True, tree_references=True)
1853+ serializer = inventory_delta.InventoryDeltaSerializer()
1854 err = self.assertRaises(errors.BzrError,
1855 serializer.parse_text_bytes,
1856 'format: bzr inventory delta v1 (bzr 1.14)\n'
1857@@ -124,8 +120,7 @@
1858 self.assertContainsRe(str(err), 'missing version: marker')
1859
1860 def test_parse_duplicate_key_errors(self):
1861- serializer = inventory_delta.InventoryDeltaSerializer(
1862- versioned_root=True, tree_references=True)
1863+ serializer = inventory_delta.InventoryDeltaSerializer()
1864 double_root_lines = \
1865 """format: bzr inventory delta v1 (bzr 1.14)
1866 parent: null:
1867@@ -140,19 +135,20 @@
1868 self.assertContainsRe(str(err), 'duplicate file id')
1869
1870 def test_parse_versioned_root_only(self):
1871- serializer = inventory_delta.InventoryDeltaSerializer(
1872- versioned_root=True, tree_references=True)
1873+ serializer = inventory_delta.InventoryDeltaSerializer()
1874+ serializer.require_flags(versioned_root=True, tree_references=True)
1875 parse_result = serializer.parse_text_bytes(root_only_lines)
1876 expected_entry = inventory.make_entry(
1877 'directory', u'', None, 'an-id')
1878 expected_entry.revision = 'a@e\xc3\xa5ample.com--2004'
1879 self.assertEqual(
1880- ('null:', 'entry-version', [(None, '/', 'an-id', expected_entry)]),
1881+ ('null:', 'entry-version', True, True,
1882+ [(None, '', 'an-id', expected_entry)]),
1883 parse_result)
1884
1885 def test_parse_special_revid_not_valid_last_mod(self):
1886- serializer = inventory_delta.InventoryDeltaSerializer(
1887- versioned_root=False, tree_references=True)
1888+ serializer = inventory_delta.InventoryDeltaSerializer()
1889+ serializer.require_flags(versioned_root=False, tree_references=True)
1890 root_only_lines = """format: bzr inventory delta v1 (bzr 1.14)
1891 parent: null:
1892 version: null:
1893@@ -165,8 +161,8 @@
1894 self.assertContainsRe(str(err), 'special revisionid found')
1895
1896 def test_parse_versioned_root_versioned_disabled(self):
1897- serializer = inventory_delta.InventoryDeltaSerializer(
1898- versioned_root=False, tree_references=True)
1899+ serializer = inventory_delta.InventoryDeltaSerializer()
1900+ serializer.require_flags(versioned_root=False, tree_references=True)
1901 root_only_lines = """format: bzr inventory delta v1 (bzr 1.14)
1902 parent: null:
1903 version: null:
1904@@ -179,8 +175,8 @@
1905 self.assertContainsRe(str(err), 'Versioned root found')
1906
1907 def test_parse_unique_root_id_root_versioned_disabled(self):
1908- serializer = inventory_delta.InventoryDeltaSerializer(
1909- versioned_root=False, tree_references=True)
1910+ serializer = inventory_delta.InventoryDeltaSerializer()
1911+ serializer.require_flags(versioned_root=False, tree_references=True)
1912 root_only_lines = """format: bzr inventory delta v1 (bzr 1.14)
1913 parent: null:
1914 version: null:
1915@@ -193,21 +189,80 @@
1916 self.assertContainsRe(str(err), 'Versioned root found')
1917
1918 def test_parse_unversioned_root_versioning_enabled(self):
1919- serializer = inventory_delta.InventoryDeltaSerializer(
1920- versioned_root=True, tree_references=True)
1921+ serializer = inventory_delta.InventoryDeltaSerializer()
1922+ serializer.require_flags(versioned_root=True, tree_references=True)
1923 err = self.assertRaises(errors.BzrError,
1924 serializer.parse_text_bytes, root_only_unversioned)
1925 self.assertContainsRe(
1926 str(err), 'serialized versioned_root flag is wrong: False')
1927
1928 def test_parse_tree_when_disabled(self):
1929- serializer = inventory_delta.InventoryDeltaSerializer(
1930- versioned_root=True, tree_references=False)
1931+ serializer = inventory_delta.InventoryDeltaSerializer()
1932+ serializer.require_flags(versioned_root=True, tree_references=False)
1933 err = self.assertRaises(errors.BzrError,
1934 serializer.parse_text_bytes, reference_lines)
1935 self.assertContainsRe(
1936 str(err), 'serialized tree_references flag is wrong: True')
1937
1938+ def test_parse_tree_when_header_disallows(self):
1939+ # A deserializer that allows tree_references to be set or unset.
1940+ serializer = inventory_delta.InventoryDeltaSerializer()
1941+ # A serialised inventory delta with a header saying no tree refs, but
1942+ # that has a tree ref in its content.
1943+ lines = """format: bzr inventory delta v1 (bzr 1.14)
1944+parent: null:
1945+version: entry-version
1946+versioned_root: false
1947+tree_references: false
1948+None\x00/foo\x00id\x00TREE_ROOT\x00changed\x00tree\x00subtree-version
1949+"""
1950+ err = self.assertRaises(errors.BzrError,
1951+ serializer.parse_text_bytes, lines)
1952+ self.assertContainsRe(str(err), 'Tree reference found')
1953+
1954+ def test_parse_versioned_root_when_header_disallows(self):
1955+ # A deserializer that allows tree_references to be set or unset.
1956+ serializer = inventory_delta.InventoryDeltaSerializer()
1957+ # A serialised inventory delta with a header saying no tree refs, but
1958+ # that has a tree ref in its content.
1959+ lines = """format: bzr inventory delta v1 (bzr 1.14)
1960+parent: null:
1961+version: entry-version
1962+versioned_root: false
1963+tree_references: false
1964+None\x00/\x00TREE_ROOT\x00\x00a@e\xc3\xa5ample.com--2004\x00dir
1965+"""
1966+ err = self.assertRaises(errors.BzrError,
1967+ serializer.parse_text_bytes, lines)
1968+ self.assertContainsRe(str(err), 'Versioned root found')
1969+
1970+ def test_parse_last_line_not_empty(self):
1971+ """newpath must start with / if it is not None."""
1972+ # Trim the trailing newline from a valid serialization
1973+ lines = root_only_lines[:-1]
1974+ serializer = inventory_delta.InventoryDeltaSerializer()
1975+ err = self.assertRaises(errors.BzrError,
1976+ serializer.parse_text_bytes, lines)
1977+ self.assertContainsRe(str(err), 'last line not empty')
1978+
1979+ def test_parse_invalid_newpath(self):
1980+ """newpath must start with / if it is not None."""
1981+ lines = empty_lines
1982+ lines += "None\x00bad\x00TREE_ROOT\x00\x00version\x00dir\n"
1983+ serializer = inventory_delta.InventoryDeltaSerializer()
1984+ err = self.assertRaises(errors.BzrError,
1985+ serializer.parse_text_bytes, lines)
1986+ self.assertContainsRe(str(err), 'newpath invalid')
1987+
1988+ def test_parse_invalid_oldpath(self):
1989+ """oldpath must start with / if it is not None."""
1990+ lines = root_only_lines
1991+ lines += "bad\x00/new\x00file-id\x00\x00version\x00dir\n"
1992+ serializer = inventory_delta.InventoryDeltaSerializer()
1993+ err = self.assertRaises(errors.BzrError,
1994+ serializer.parse_text_bytes, lines)
1995+ self.assertContainsRe(str(err), 'oldpath invalid')
1996+
1997
1998 class TestSerialization(TestCase):
1999 """Tests for InventoryDeltaSerializer.delta_to_lines."""
2000@@ -216,8 +271,8 @@
2001 old_inv = Inventory(None)
2002 new_inv = Inventory(None)
2003 delta = new_inv._make_delta(old_inv)
2004- serializer = inventory_delta.InventoryDeltaSerializer(
2005- versioned_root=True, tree_references=True)
2006+ serializer = inventory_delta.InventoryDeltaSerializer()
2007+ serializer.require_flags(True, True)
2008 self.assertEqual(StringIO(empty_lines).readlines(),
2009 serializer.delta_to_lines(NULL_REVISION, NULL_REVISION, delta))
2010
2011@@ -228,8 +283,8 @@
2012 root.revision = 'a@e\xc3\xa5ample.com--2004'
2013 new_inv.add(root)
2014 delta = new_inv._make_delta(old_inv)
2015- serializer = inventory_delta.InventoryDeltaSerializer(
2016- versioned_root=True, tree_references=True)
2017+ serializer = inventory_delta.InventoryDeltaSerializer()
2018+ serializer.require_flags(versioned_root=True, tree_references=True)
2019 self.assertEqual(StringIO(root_only_lines).readlines(),
2020 serializer.delta_to_lines(NULL_REVISION, 'entry-version', delta))
2021
2022@@ -239,8 +294,8 @@
2023 root = new_inv.make_entry('directory', '', None, 'TREE_ROOT')
2024 new_inv.add(root)
2025 delta = new_inv._make_delta(old_inv)
2026- serializer = inventory_delta.InventoryDeltaSerializer(
2027- versioned_root=False, tree_references=False)
2028+ serializer = inventory_delta.InventoryDeltaSerializer()
2029+ serializer.require_flags(False, False)
2030 self.assertEqual(StringIO(root_only_unversioned).readlines(),
2031 serializer.delta_to_lines(NULL_REVISION, 'entry-version', delta))
2032
2033@@ -253,8 +308,8 @@
2034 non_root = new_inv.make_entry('directory', 'foo', root.file_id, 'id')
2035 new_inv.add(non_root)
2036 delta = new_inv._make_delta(old_inv)
2037- serializer = inventory_delta.InventoryDeltaSerializer(
2038- versioned_root=True, tree_references=True)
2039+ serializer = inventory_delta.InventoryDeltaSerializer()
2040+ serializer.require_flags(versioned_root=True, tree_references=True)
2041 err = self.assertRaises(errors.BzrError,
2042 serializer.delta_to_lines, NULL_REVISION, 'entry-version', delta)
2043 self.assertEqual(str(err), 'no version for fileid id')
2044@@ -265,8 +320,8 @@
2045 root = new_inv.make_entry('directory', '', None, 'TREE_ROOT')
2046 new_inv.add(root)
2047 delta = new_inv._make_delta(old_inv)
2048- serializer = inventory_delta.InventoryDeltaSerializer(
2049- versioned_root=True, tree_references=True)
2050+ serializer = inventory_delta.InventoryDeltaSerializer()
2051+ serializer.require_flags(versioned_root=True, tree_references=True)
2052 err = self.assertRaises(errors.BzrError,
2053 serializer.delta_to_lines, NULL_REVISION, 'entry-version', delta)
2054 self.assertEqual(str(err), 'no version for fileid TREE_ROOT')
2055@@ -278,8 +333,8 @@
2056 root.revision = 'a@e\xc3\xa5ample.com--2004'
2057 new_inv.add(root)
2058 delta = new_inv._make_delta(old_inv)
2059- serializer = inventory_delta.InventoryDeltaSerializer(
2060- versioned_root=False, tree_references=True)
2061+ serializer = inventory_delta.InventoryDeltaSerializer()
2062+ serializer.require_flags(versioned_root=False, tree_references=True)
2063 err = self.assertRaises(errors.BzrError,
2064 serializer.delta_to_lines, NULL_REVISION, 'entry-version', delta)
2065 self.assertEqual(str(err), 'Version present for / in TREE_ROOT')
2066@@ -290,8 +345,8 @@
2067 root = new_inv.make_entry('directory', '', None, 'my-rich-root-id')
2068 new_inv.add(root)
2069 delta = new_inv._make_delta(old_inv)
2070- serializer = inventory_delta.InventoryDeltaSerializer(
2071- versioned_root=False, tree_references=True)
2072+ serializer = inventory_delta.InventoryDeltaSerializer()
2073+ serializer.require_flags(versioned_root=False, tree_references=True)
2074 err = self.assertRaises(errors.BzrError,
2075 serializer.delta_to_lines, NULL_REVISION, 'entry-version', delta)
2076 self.assertEqual(
2077@@ -308,8 +363,8 @@
2078 non_root.kind = 'strangelove'
2079 new_inv.add(non_root)
2080 delta = new_inv._make_delta(old_inv)
2081- serializer = inventory_delta.InventoryDeltaSerializer(
2082- versioned_root=True, tree_references=True)
2083+ serializer = inventory_delta.InventoryDeltaSerializer()
2084+ serializer.require_flags(True, True)
2085 # we expect keyerror because there is little value wrapping this.
2086 # This test aims to prove that it errors more than how it errors.
2087 err = self.assertRaises(KeyError,
2088@@ -328,8 +383,8 @@
2089 non_root.reference_revision = 'subtree-version'
2090 new_inv.add(non_root)
2091 delta = new_inv._make_delta(old_inv)
2092- serializer = inventory_delta.InventoryDeltaSerializer(
2093- versioned_root=True, tree_references=False)
2094+ serializer = inventory_delta.InventoryDeltaSerializer()
2095+ serializer.require_flags(versioned_root=True, tree_references=False)
2096 # we expect keyerror because there is little value wrapping this.
2097 # This test aims to prove that it errors more than how it errors.
2098 err = self.assertRaises(KeyError,
2099@@ -348,23 +403,26 @@
2100 non_root.reference_revision = 'subtree-version'
2101 new_inv.add(non_root)
2102 delta = new_inv._make_delta(old_inv)
2103- serializer = inventory_delta.InventoryDeltaSerializer(
2104- versioned_root=True, tree_references=True)
2105+ serializer = inventory_delta.InventoryDeltaSerializer()
2106+ serializer.require_flags(versioned_root=True, tree_references=True)
2107 self.assertEqual(StringIO(reference_lines).readlines(),
2108 serializer.delta_to_lines(NULL_REVISION, 'entry-version', delta))
2109
2110 def test_to_inventory_root_id_versioned_not_permitted(self):
2111- delta = [(None, '/', 'TREE_ROOT', inventory.make_entry(
2112- 'directory', '', None, 'TREE_ROOT'))]
2113- serializer = inventory_delta.InventoryDeltaSerializer(False, True)
2114+ root_entry = inventory.make_entry('directory', '', None, 'TREE_ROOT')
2115+ root_entry.revision = 'some-version'
2116+ delta = [(None, '', 'TREE_ROOT', root_entry)]
2117+ serializer = inventory_delta.InventoryDeltaSerializer()
2118+ serializer.require_flags(versioned_root=False, tree_references=True)
2119 self.assertRaises(
2120 errors.BzrError, serializer.delta_to_lines, 'old-version',
2121 'new-version', delta)
2122
2123 def test_to_inventory_root_id_not_versioned(self):
2124- delta = [(None, '/', 'an-id', inventory.make_entry(
2125+ delta = [(None, '', 'an-id', inventory.make_entry(
2126 'directory', '', None, 'an-id'))]
2127- serializer = inventory_delta.InventoryDeltaSerializer(True, True)
2128+ serializer = inventory_delta.InventoryDeltaSerializer()
2129+ serializer.require_flags(versioned_root=True, tree_references=True)
2130 self.assertRaises(
2131 errors.BzrError, serializer.delta_to_lines, 'old-version',
2132 'new-version', delta)
2133@@ -374,12 +432,13 @@
2134 tree_ref = make_entry('tree-reference', 'foo', 'changed-in', 'ref-id')
2135 tree_ref.reference_revision = 'ref-revision'
2136 delta = [
2137- (None, '/', 'an-id',
2138+ (None, '', 'an-id',
2139 make_entry('directory', '', 'changed-in', 'an-id')),
2140- (None, '/foo', 'ref-id', tree_ref)
2141+ (None, 'foo', 'ref-id', tree_ref)
2142 # a file that followed the root move
2143 ]
2144- serializer = inventory_delta.InventoryDeltaSerializer(True, True)
2145+ serializer = inventory_delta.InventoryDeltaSerializer()
2146+ serializer.require_flags(versioned_root=True, tree_references=True)
2147 self.assertRaises(errors.BzrError, serializer.delta_to_lines,
2148 'old-version', 'new-version', delta)
2149
2150@@ -430,7 +489,8 @@
2151 executable=True, text_size=30, text_sha1='some-sha',
2152 revision='old-rev')),
2153 ]
2154- serializer = inventory_delta.InventoryDeltaSerializer(True, True)
2155+ serializer = inventory_delta.InventoryDeltaSerializer()
2156+ serializer.require_flags(versioned_root=True, tree_references=True)
2157 lines = serializer.delta_to_lines(NULL_REVISION, 'something', delta)
2158 expected = """format: bzr inventory delta v1 (bzr 1.14)
2159 parent: null:
2160
2161=== modified file 'bzrlib/tests/test_remote.py'
2162--- bzrlib/tests/test_remote.py 2009-07-16 05:22:50 +0000
2163+++ bzrlib/tests/test_remote.py 2009-07-22 00:35:23 +0000
2164@@ -31,6 +31,7 @@
2165 config,
2166 errors,
2167 graph,
2168+ inventory,
2169 pack,
2170 remote,
2171 repository,
2172@@ -38,6 +39,7 @@
2173 tests,
2174 treebuilder,
2175 urlutils,
2176+ versionedfile,
2177 )
2178 from bzrlib.branch import Branch
2179 from bzrlib.bzrdir import BzrDir, BzrDirFormat
2180@@ -332,15 +334,6 @@
2181 reference_bzrdir_format = bzrdir.format_registry.get('default')()
2182 return reference_bzrdir_format.repository_format
2183
2184- def disable_verb(self, verb):
2185- """Disable a verb for one test."""
2186- request_handlers = smart.request.request_handlers
2187- orig_method = request_handlers.get(verb)
2188- request_handlers.remove(verb)
2189- def restoreVerb():
2190- request_handlers.register(verb, orig_method)
2191- self.addCleanup(restoreVerb)
2192-
2193 def assertFinished(self, fake_client):
2194 """Assert that all of a FakeClient's expected calls have occurred."""
2195 fake_client.finished_test()
2196@@ -2220,54 +2213,214 @@
2197
2198
2199 class TestRepositoryInsertStream(TestRemoteRepository):
2200-
2201- def test_unlocked_repo(self):
2202- transport_path = 'quack'
2203- repo, client = self.setup_fake_client_and_repository(transport_path)
2204- client.add_expected_call(
2205- 'Repository.insert_stream', ('quack/', ''),
2206- 'success', ('ok',))
2207- client.add_expected_call(
2208- 'Repository.insert_stream', ('quack/', ''),
2209- 'success', ('ok',))
2210- sink = repo._get_sink()
2211- fmt = repository.RepositoryFormat.get_default_format()
2212- resume_tokens, missing_keys = sink.insert_stream([], fmt, [])
2213- self.assertEqual([], resume_tokens)
2214- self.assertEqual(set(), missing_keys)
2215- self.assertFinished(client)
2216-
2217- def test_locked_repo_with_no_lock_token(self):
2218- transport_path = 'quack'
2219- repo, client = self.setup_fake_client_and_repository(transport_path)
2220- client.add_expected_call(
2221- 'Repository.lock_write', ('quack/', ''),
2222- 'success', ('ok', ''))
2223- client.add_expected_call(
2224- 'Repository.insert_stream', ('quack/', ''),
2225- 'success', ('ok',))
2226- client.add_expected_call(
2227- 'Repository.insert_stream', ('quack/', ''),
2228- 'success', ('ok',))
2229- repo.lock_write()
2230- sink = repo._get_sink()
2231- fmt = repository.RepositoryFormat.get_default_format()
2232- resume_tokens, missing_keys = sink.insert_stream([], fmt, [])
2233- self.assertEqual([], resume_tokens)
2234- self.assertEqual(set(), missing_keys)
2235- self.assertFinished(client)
2236-
2237- def test_locked_repo_with_lock_token(self):
2238- transport_path = 'quack'
2239- repo, client = self.setup_fake_client_and_repository(transport_path)
2240- client.add_expected_call(
2241- 'Repository.lock_write', ('quack/', ''),
2242- 'success', ('ok', 'a token'))
2243- client.add_expected_call(
2244- 'Repository.insert_stream_locked', ('quack/', '', 'a token'),
2245- 'success', ('ok',))
2246- client.add_expected_call(
2247- 'Repository.insert_stream_locked', ('quack/', '', 'a token'),
2248+ """Tests for using Repository.insert_stream verb when the _1.18 variant is
2249+ not available.
2250+
2251+ This test case is very similar to TestRepositoryInsertStream_1_18.
2252+ """
2253+
2254+ def setUp(self):
2255+ TestRemoteRepository.setUp(self)
2256+ self.disable_verb('Repository.insert_stream_1.18')
2257+
2258+ def test_unlocked_repo(self):
2259+ transport_path = 'quack'
2260+ repo, client = self.setup_fake_client_and_repository(transport_path)
2261+ client.add_expected_call(
2262+ 'Repository.insert_stream_1.18', ('quack/', ''),
2263+ 'unknown', ('Repository.insert_stream_1.18',))
2264+ client.add_expected_call(
2265+ 'Repository.insert_stream', ('quack/', ''),
2266+ 'success', ('ok',))
2267+ client.add_expected_call(
2268+ 'Repository.insert_stream', ('quack/', ''),
2269+ 'success', ('ok',))
2270+ sink = repo._get_sink()
2271+ fmt = repository.RepositoryFormat.get_default_format()
2272+ resume_tokens, missing_keys = sink.insert_stream([], fmt, [])
2273+ self.assertEqual([], resume_tokens)
2274+ self.assertEqual(set(), missing_keys)
2275+ self.assertFinished(client)
2276+
2277+ def test_locked_repo_with_no_lock_token(self):
2278+ transport_path = 'quack'
2279+ repo, client = self.setup_fake_client_and_repository(transport_path)
2280+ client.add_expected_call(
2281+ 'Repository.lock_write', ('quack/', ''),
2282+ 'success', ('ok', ''))
2283+ client.add_expected_call(
2284+ 'Repository.insert_stream_1.18', ('quack/', ''),
2285+ 'unknown', ('Repository.insert_stream_1.18',))
2286+ client.add_expected_call(
2287+ 'Repository.insert_stream', ('quack/', ''),
2288+ 'success', ('ok',))
2289+ client.add_expected_call(
2290+ 'Repository.insert_stream', ('quack/', ''),
2291+ 'success', ('ok',))
2292+ repo.lock_write()
2293+ sink = repo._get_sink()
2294+ fmt = repository.RepositoryFormat.get_default_format()
2295+ resume_tokens, missing_keys = sink.insert_stream([], fmt, [])
2296+ self.assertEqual([], resume_tokens)
2297+ self.assertEqual(set(), missing_keys)
2298+ self.assertFinished(client)
2299+
2300+ def test_locked_repo_with_lock_token(self):
2301+ transport_path = 'quack'
2302+ repo, client = self.setup_fake_client_and_repository(transport_path)
2303+ client.add_expected_call(
2304+ 'Repository.lock_write', ('quack/', ''),
2305+ 'success', ('ok', 'a token'))
2306+ client.add_expected_call(
2307+ 'Repository.insert_stream_1.18', ('quack/', '', 'a token'),
2308+ 'unknown', ('Repository.insert_stream_1.18',))
2309+ client.add_expected_call(
2310+ 'Repository.insert_stream_locked', ('quack/', '', 'a token'),
2311+ 'success', ('ok',))
2312+ client.add_expected_call(
2313+ 'Repository.insert_stream_locked', ('quack/', '', 'a token'),
2314+ 'success', ('ok',))
2315+ repo.lock_write()
2316+ sink = repo._get_sink()
2317+ fmt = repository.RepositoryFormat.get_default_format()
2318+ resume_tokens, missing_keys = sink.insert_stream([], fmt, [])
2319+ self.assertEqual([], resume_tokens)
2320+ self.assertEqual(set(), missing_keys)
2321+ self.assertFinished(client)
2322+
2323+ def test_stream_with_inventory_delta(self):
2324+ """inventory-delta records can't be sent to the
2325+ Repository.insert_stream verb. So when one is encountered the
2326+ RemoteSink immediately stops using that verb and falls back to VFS
2327+ insert_stream.
2328+ """
2329+ transport_path = 'quack'
2330+ repo, client = self.setup_fake_client_and_repository(transport_path)
2331+ client.add_expected_call(
2332+ 'Repository.insert_stream_1.18', ('quack/', ''),
2333+ 'unknown', ('Repository.insert_stream_1.18',))
2334+ client.add_expected_call(
2335+ 'Repository.insert_stream', ('quack/', ''),
2336+ 'success', ('ok',))
2337+ client.add_expected_call(
2338+ 'Repository.insert_stream', ('quack/', ''),
2339+ 'success', ('ok',))
2340+ # Create a fake real repository for insert_stream to fall back on, so
2341+ # that we can directly see the records the RemoteSink passes to the
2342+ # real sink.
2343+ class FakeRealSink:
2344+ def __init__(self):
2345+ self.records = []
2346+ def insert_stream(self, stream, src_format, resume_tokens):
2347+ for substream_kind, substream in stream:
2348+ self.records.append(
2349+ (substream_kind, [record.key for record in substream]))
2350+ return ['fake tokens'], ['fake missing keys']
2351+ fake_real_sink = FakeRealSink()
2352+ class FakeRealRepository:
2353+ def _get_sink(self):
2354+ return fake_real_sink
2355+ repo._real_repository = FakeRealRepository()
2356+ sink = repo._get_sink()
2357+ fmt = repository.RepositoryFormat.get_default_format()
2358+ stream = self.make_stream_with_inv_deltas(fmt)
2359+ resume_tokens, missing_keys = sink.insert_stream(stream, fmt, [])
2360+ # Every record from the first inventory delta should have been sent to
2361+ # the VFS sink.
2362+ expected_records = [
2363+ ('inventories', [('rev2',), ('rev3',)]),
2364+ ('texts', [('some-rev', 'some-file')])]
2365+ self.assertEqual(expected_records, fake_real_sink.records)
2366+ # The return values from the real sink's insert_stream are propagated
2367+ # back to the original caller.
2368+ self.assertEqual(['fake tokens'], resume_tokens)
2369+ self.assertEqual(['fake missing keys'], missing_keys)
2370+ self.assertFinished(client)
2371+
2372+ def make_stream_with_inv_deltas(self, fmt):
2373+ """Make a simple stream with an inventory delta followed by more
2374+ records and more substreams to test that all records and substreams
2375+ from that point on are used.
2376+
2377+ This sends, in order:
2378+ * inventories substream: rev1, rev2, rev3. rev2 and rev3 are
2379+ inventory-deltas.
2380+ * texts substream: (some-rev, some-file)
2381+ """
2382+ # Define a stream using generators so that it isn't rewindable.
2383+ def stream_with_inv_delta():
2384+ yield ('inventories', inventory_substream_with_delta())
2385+ yield ('texts', [
2386+ versionedfile.FulltextContentFactory(
2387+ ('some-rev', 'some-file'), (), None, 'content')])
2388+ def inventory_substream_with_delta():
2389+ # An empty inventory fulltext. This will be streamed normally.
2390+ inv = inventory.Inventory(revision_id='rev1')
2391+ text = fmt._serializer.write_inventory_to_string(inv)
2392+ yield versionedfile.FulltextContentFactory(
2393+ ('rev1',), (), None, text)
2394+ # An inventory delta. This can't be streamed via this verb, so it
2395+ # will trigger a fallback to VFS insert_stream.
2396+ entry = inv.make_entry(
2397+ 'directory', 'newdir', inv.root.file_id, 'newdir-id')
2398+ delta = [(None, 'newdir', 'newdir-id', entry)]
2399+ yield versionedfile.InventoryDeltaContentFactory(
2400+ ('rev2',), (('rev1',)), None, ('rev1',), (True, False), None)
2401+ # Another delta.
2402+ yield versionedfile.InventoryDeltaContentFactory(
2403+ ('rev3',), (('rev1',)), None, ('rev1',), (True, False), None)
2404+ return stream_with_inv_delta()
2405+
2406+
2407+class TestRepositoryInsertStream_1_18(TestRemoteRepository):
2408+
2409+ def test_unlocked_repo(self):
2410+ transport_path = 'quack'
2411+ repo, client = self.setup_fake_client_and_repository(transport_path)
2412+ client.add_expected_call(
2413+ 'Repository.insert_stream_1.18', ('quack/', ''),
2414+ 'success', ('ok',))
2415+ client.add_expected_call(
2416+ 'Repository.insert_stream_1.18', ('quack/', ''),
2417+ 'success', ('ok',))
2418+ sink = repo._get_sink()
2419+ fmt = repository.RepositoryFormat.get_default_format()
2420+ resume_tokens, missing_keys = sink.insert_stream([], fmt, [])
2421+ self.assertEqual([], resume_tokens)
2422+ self.assertEqual(set(), missing_keys)
2423+ self.assertFinished(client)
2424+
2425+ def test_locked_repo_with_no_lock_token(self):
2426+ transport_path = 'quack'
2427+ repo, client = self.setup_fake_client_and_repository(transport_path)
2428+ client.add_expected_call(
2429+ 'Repository.lock_write', ('quack/', ''),
2430+ 'success', ('ok', ''))
2431+ client.add_expected_call(
2432+ 'Repository.insert_stream_1.18', ('quack/', ''),
2433+ 'success', ('ok',))
2434+ client.add_expected_call(
2435+ 'Repository.insert_stream_1.18', ('quack/', ''),
2436+ 'success', ('ok',))
2437+ repo.lock_write()
2438+ sink = repo._get_sink()
2439+ fmt = repository.RepositoryFormat.get_default_format()
2440+ resume_tokens, missing_keys = sink.insert_stream([], fmt, [])
2441+ self.assertEqual([], resume_tokens)
2442+ self.assertEqual(set(), missing_keys)
2443+ self.assertFinished(client)
2444+
2445+ def test_locked_repo_with_lock_token(self):
2446+ transport_path = 'quack'
2447+ repo, client = self.setup_fake_client_and_repository(transport_path)
2448+ client.add_expected_call(
2449+ 'Repository.lock_write', ('quack/', ''),
2450+ 'success', ('ok', 'a token'))
2451+ client.add_expected_call(
2452+ 'Repository.insert_stream_1.18', ('quack/', '', 'a token'),
2453+ 'success', ('ok',))
2454+ client.add_expected_call(
2455+ 'Repository.insert_stream_1.18', ('quack/', '', 'a token'),
2456 'success', ('ok',))
2457 repo.lock_write()
2458 sink = repo._get_sink()
2459
2460=== modified file 'bzrlib/tests/test_selftest.py'
2461--- bzrlib/tests/test_selftest.py 2009-07-20 04:22:47 +0000
2462+++ bzrlib/tests/test_selftest.py 2009-07-22 00:35:23 +0000
2463@@ -124,7 +124,7 @@
2464 self.assertEqual(sample_permutation,
2465 get_transport_test_permutations(MockModule()))
2466
2467- def test_scenarios_invlude_all_modules(self):
2468+ def test_scenarios_include_all_modules(self):
2469 # this checks that the scenario generator returns as many permutations
2470 # as there are in all the registered transport modules - we assume if
2471 # this matches its probably doing the right thing especially in
2472@@ -297,14 +297,12 @@
2473 scenarios = make_scenarios(server1, server2, formats)
2474 self.assertEqual([
2475 ('str,str,str',
2476- {'interrepo_class': str,
2477- 'repository_format': 'C1',
2478+ {'repository_format': 'C1',
2479 'repository_format_to': 'C2',
2480 'transport_readonly_server': 'b',
2481 'transport_server': 'a'}),
2482 ('int,str,str',
2483- {'interrepo_class': int,
2484- 'repository_format': 'D1',
2485+ {'repository_format': 'D1',
2486 'repository_format_to': 'D2',
2487 'transport_readonly_server': 'b',
2488 'transport_server': 'a'})],
2489
2490=== modified file 'bzrlib/tests/test_xml.py'
2491--- bzrlib/tests/test_xml.py 2009-04-03 21:50:40 +0000
2492+++ bzrlib/tests/test_xml.py 2009-07-22 00:35:23 +0000
2493@@ -19,6 +19,7 @@
2494 from bzrlib import (
2495 errors,
2496 inventory,
2497+ xml6,
2498 xml7,
2499 xml8,
2500 serializer,
2501@@ -139,6 +140,14 @@
2502 </inventory>
2503 """
2504
2505+_expected_inv_v6 = """<inventory format="6" revision_id="rev_outer">
2506+<directory file_id="tree-root-321" name="" revision="rev_outer" />
2507+<directory file_id="dir-id" name="dir" parent_id="tree-root-321" revision="rev_outer" />
2508+<file file_id="file-id" name="file" parent_id="tree-root-321" revision="rev_outer" text_sha1="A" text_size="1" />
2509+<symlink file_id="link-id" name="link" parent_id="tree-root-321" revision="rev_outer" symlink_target="a" />
2510+</inventory>
2511+"""
2512+
2513 _expected_inv_v7 = """<inventory format="7" revision_id="rev_outer">
2514 <directory file_id="tree-root-321" name="" revision="rev_outer" />
2515 <directory file_id="dir-id" name="dir" parent_id="tree-root-321" revision="rev_outer" />
2516@@ -377,6 +386,17 @@
2517 for path, ie in inv.iter_entries():
2518 self.assertEqual(ie, inv2[ie.file_id])
2519
2520+ def test_roundtrip_inventory_v6(self):
2521+ inv = self.get_sample_inventory()
2522+ txt = xml6.serializer_v6.write_inventory_to_string(inv)
2523+ lines = xml6.serializer_v6.write_inventory_to_lines(inv)
2524+ self.assertEqual(bzrlib.osutils.split_lines(txt), lines)
2525+ self.assertEqualDiff(_expected_inv_v6, txt)
2526+ inv2 = xml6.serializer_v6.read_inventory_from_string(txt)
2527+ self.assertEqual(4, len(inv2))
2528+ for path, ie in inv.iter_entries():
2529+ self.assertEqual(ie, inv2[ie.file_id])
2530+
2531 def test_wrong_format_v7(self):
2532 """Can't accidentally open a file with wrong serializer"""
2533 s_v6 = bzrlib.xml6.serializer_v6
2534
2535=== modified file 'bzrlib/versionedfile.py'
2536--- bzrlib/versionedfile.py 2009-07-06 20:21:34 +0000
2537+++ bzrlib/versionedfile.py 2009-07-22 00:35:23 +0000
2538@@ -34,6 +34,8 @@
2539 errors,
2540 groupcompress,
2541 index,
2542+ inventory,
2543+ inventory_delta,
2544 knit,
2545 osutils,
2546 multiparent,
2547@@ -158,6 +160,31 @@
2548 self.storage_kind)
2549
2550
2551+class InventoryDeltaContentFactory(ContentFactory):
2552+
2553+ def __init__(self, key, parents, sha1, delta, basis_id, format_flags,
2554+ repo=None):
2555+ self.sha1 = sha1
2556+ self.storage_kind = 'inventory-delta'
2557+ self.key = key
2558+ self.parents = parents
2559+ self._delta = delta
2560+ self._basis_id = basis_id
2561+ self._format_flags = format_flags
2562+ self._repo = repo
2563+
2564+ def get_bytes_as(self, storage_kind):
2565+ if storage_kind == self.storage_kind:
2566+ return self._basis_id, self.key, self._delta, self._format_flags
2567+ elif storage_kind == 'inventory-delta-bytes':
2568+ serializer = inventory_delta.InventoryDeltaSerializer()
2569+ serializer.require_flags(*self._format_flags)
2570+ return ''.join(serializer.delta_to_lines(
2571+ self._basis_id, self.key, self._delta))
2572+ raise errors.UnavailableRepresentation(self.key, storage_kind,
2573+ self.storage_kind)
2574+
2575+
2576 class AbsentContentFactory(ContentFactory):
2577 """A placeholder content factory for unavailable texts.
2578
2579@@ -1551,13 +1578,15 @@
2580 record.get_bytes_as(record.storage_kind) call.
2581 """
2582 self._bytes_iterator = bytes_iterator
2583- self._kind_factory = {'knit-ft-gz':knit.knit_network_to_record,
2584- 'knit-delta-gz':knit.knit_network_to_record,
2585- 'knit-annotated-ft-gz':knit.knit_network_to_record,
2586- 'knit-annotated-delta-gz':knit.knit_network_to_record,
2587- 'knit-delta-closure':knit.knit_delta_closure_to_records,
2588- 'fulltext':fulltext_network_to_record,
2589- 'groupcompress-block':groupcompress.network_block_to_records,
2590+ self._kind_factory = {
2591+ 'fulltext': fulltext_network_to_record,
2592+ 'groupcompress-block': groupcompress.network_block_to_records,
2593+ 'inventory-delta': inventory_delta_network_to_record,
2594+ 'knit-ft-gz': knit.knit_network_to_record,
2595+ 'knit-delta-gz': knit.knit_network_to_record,
2596+ 'knit-annotated-ft-gz': knit.knit_network_to_record,
2597+ 'knit-annotated-delta-gz': knit.knit_network_to_record,
2598+ 'knit-delta-closure': knit.knit_delta_closure_to_records,
2599 }
2600
2601 def read(self):
2602@@ -1583,6 +1612,21 @@
2603 return [FulltextContentFactory(key, parents, None, fulltext)]
2604
2605
2606+def inventory_delta_network_to_record(kind, bytes, line_end):
2607+ """Convert a network inventory-delta record to record."""
2608+ meta_len, = struct.unpack('!L', bytes[line_end:line_end+4])
2609+ record_meta = bytes[line_end+4:line_end+4+meta_len]
2610+ key, parents = bencode.bdecode_as_tuple(record_meta)
2611+ if parents == 'nil':
2612+ parents = None
2613+ inventory_delta_bytes = bytes[line_end+4+meta_len:]
2614+ deserialiser = inventory_delta.InventoryDeltaSerializer()
2615+ parse_result = deserialiser.parse_text_bytes(inventory_delta_bytes)
2616+ basis_id, new_id, rich_root, tree_refs, delta = parse_result
2617+ return [InventoryDeltaContentFactory(
2618+ key, parents, None, delta, basis_id, (rich_root, tree_refs))]
2619+
2620+
2621 def _length_prefix(bytes):
2622 return struct.pack('!L', len(bytes))
2623
2624@@ -1598,6 +1642,17 @@
2625 _length_prefix(record_meta), record_meta, record_content)
2626
2627
2628+def record_to_inventory_delta_bytes(record):
2629+ record_content = record.get_bytes_as('inventory-delta-bytes')
2630+ if record.parents is None:
2631+ parents = 'nil'
2632+ else:
2633+ parents = record.parents
2634+ record_meta = bencode.bencode((record.key, parents))
2635+ return "inventory-delta\n%s%s%s" % (
2636+ _length_prefix(record_meta), record_meta, record_content)
2637+
2638+
2639 def sort_groupcompress(parent_map):
2640 """Sort and group the keys in parent_map into groupcompress order.
2641
2642
2643=== modified file 'bzrlib/xml5.py'
2644--- bzrlib/xml5.py 2009-04-04 02:50:01 +0000
2645+++ bzrlib/xml5.py 2009-07-22 00:35:23 +0000
2646@@ -39,8 +39,8 @@
2647 format = elt.get('format')
2648 if format is not None:
2649 if format != '5':
2650- raise BzrError("invalid format version %r on inventory"
2651- % format)
2652+ raise errors.BzrError("invalid format version %r on inventory"
2653+ % format)
2654 data_revision_id = elt.get('revision_id')
2655 if data_revision_id is not None:
2656 revision_id = cache_utf8.encode(data_revision_id)