Little bit of fiddling later: this is a diff from an updated copy of Johns branch. I'm pushing that to ~lifeless/bzr/bug-420652. -Rob === modified file 'NEWS' --- NEWS 2009-09-03 23:43:16 +0000 +++ NEWS 2009-09-04 00:48:39 +0000 @@ -33,6 +33,13 @@ * Don't restrict the command name used to run the test suite. (Vincent Ladeuil, #419950) +* Fetches from 2a to 2a are now again requested in 'groupcompress' order. + Groups that are seen as 'underutilized' will be repacked on-the-fly. + This means that when the source is fully packed, there is minimal + overhead during the fetch, but if the source is poorly packed the result + is a fairly well packed repository (not as good as 'bzr pack' but + good-enough.) (Robert Collins, John Arbash Meinel, #402652) + * Network streams now decode adjacent records of the same type into a single stream, reducing layering churn. (Robert Collins) @@ -109,6 +116,7 @@ * The main table of contents now provides links to the new Migration Docs and Plugins Guide. (Ian Clatworthy) + bzr 2.0rc1 ########## === modified file 'bzrlib/groupcompress.py' --- bzrlib/groupcompress.py 2009-08-30 21:34:42 +0000 +++ bzrlib/groupcompress.py 2009-09-04 00:45:56 +0000 @@ -457,7 +457,6 @@ # There are code paths that first extract as fulltext, and then # extract as storage_kind (smart fetch). So we don't break the # refcycle here, but instead in manager.get_record_stream() - # self._manager = None if storage_kind == 'fulltext': return self._bytes else: @@ -469,6 +468,14 @@ class _LazyGroupContentManager(object): """This manages a group of _LazyGroupCompressFactory objects.""" + _max_cut_fraction = 0.75 # We allow a block to be trimmed to 75% of + # current size, and still be considered + # resuable + _full_block_size = 4*1024*1024 + _full_mixed_block_size = 2*1024*1024 + _full_enough_block_size = 3*1024*1024 # size at which we won't repack + _full_enough_mixed_block_size = 2*768*1024 # 1.5MB + def __init__(self, block): self._block = block # We need to preserve the ordering @@ -546,22 +553,23 @@ # time (self._block._content) is a little expensive. self._block._ensure_content(self._last_byte) - def _check_rebuild_block(self): + def _check_rebuild_action(self): """Check to see if our block should be repacked.""" total_bytes_used = 0 last_byte_used = 0 for factory in self._factories: total_bytes_used += factory._end - factory._start - last_byte_used = max(last_byte_used, factory._end) - # If we are using most of the bytes from the block, we have nothing - # else to check (currently more that 1/2) + if last_byte_used < factory._end: + last_byte_used = factory._end + # If we are using more than half of the bytes from the block, we have + # nothing else to check if total_bytes_used * 2 >= self._block._content_length: - return - # Can we just strip off the trailing bytes? If we are going to be - # transmitting more than 50% of the front of the content, go ahead + return None, last_byte_used, total_bytes_used + # We are using less than 50% of the content. Is the content we are + # using at the beginning of the block? If so, we can just trim the + # tail, rather than rebuilding from scratch. if total_bytes_used * 2 > last_byte_used: - self._trim_block(last_byte_used) - return + return 'trim', last_byte_used, total_bytes_used # We are using a small amount of the data, and it isn't just packed # nicely at the front, so rebuild the content. @@ -574,7 +582,77 @@ # expanding many deltas into fulltexts, as well. # If we build a cheap enough 'strip', then we could try a strip, # if that expands the content, we then rebuild. - self._rebuild_block() + return 'rebuild', last_byte_used, total_bytes_used + + def check_is_well_utilized(self): + """Is the current block considered 'well utilized'? + + This is a bit of a heuristic, but it basically asks if the current + block considers itself to be a fully developed group, rather than just + a loose collection of data. + """ + if len(self._factories) == 1: + # A block of length 1 is never considered 'well utilized' :) + return False + action, last_byte_used, total_bytes_used = self._check_rebuild_action() + block_size = self._block._content_length + if total_bytes_used < block_size * self._max_cut_fraction: + # This block wants to trim itself small enough that we want to + # consider it under-utilized. + return False + # TODO: This code is meant to be the twin of _insert_record_stream's + # 'start_new_block' logic. It would probably be better to factor + # out that logic into a shared location, so that it stays + # together better + # We currently assume a block is properly utilized whenever it is >75% + # of the size of a 'full' block. In normal operation, a block is + # considered full when it hits 4MB of same-file content. So any block + # >3MB is 'full enough'. + # The only time this isn't true is when a given block has large-object + # content. (a single file >4MB, etc.) + # Under these circumstances, we allow a block to grow to + # 2 x largest_content. Which means that if a given block had a large + # object, it may actually be under-utilized. However, given that this + # is 'pack-on-the-fly' it is probably reasonable to not repack large + # contet blobs on-the-fly. + if block_size >= self._full_enough_block_size: + return True + # If a block is <3MB, it still may be considered 'full' if it contains + # mixed content. The current rule is 2MB of mixed content is considered + # full. So check to see if this block contains mixed content, and + # set the threshold appropriately. + common_prefix = None + for factory in self._factories: + prefix = factory.key[:-1] + if common_prefix is None: + common_prefix = prefix + elif prefix != common_prefix: + # Mixed content, check the size appropriately + if block_size >= self._full_enough_mixed_block_size: + return True + break + # The content failed both the mixed check and the single-content check + # so obviously it is not fully utilized + # TODO: there is one other constraint that isn't being checked + # namely, that the entries in the block are in the appropriate + # order. For example, you could insert the entries in exactly + # reverse groupcompress order, and we would think that is ok. + # (all the right objects are in one group, and it is fully + # utilized, etc.) For now, we assume that case is rare, + # especially since we should always fetch in 'groupcompress' + # order. + return False + + def _check_rebuild_block(self): + action, last_byte_used, total_bytes_used = self._check_rebuild_action() + if action is None: + return + if action == 'trim': + self._trim_block(last_byte_used) + elif action == 'rebuild': + self._rebuild_block() + else: + raise ValueError('unknown rebuild action: %r' % (action,)) def _wire_bytes(self): """Return a byte stream suitable for transmitting over the wire.""" @@ -1570,6 +1648,7 @@ block_length = None # XXX: TODO: remove this, it is just for safety checking for now inserted_keys = set() + reuse_this_block = reuse_blocks for record in stream: # Raise an error when a record is missing. if record.storage_kind == 'absent': @@ -1583,10 +1662,20 @@ if reuse_blocks: # If the reuse_blocks flag is set, check to see if we can just # copy a groupcompress block as-is. + # We only check on the first record (groupcompress-block) not + # on all of the (groupcompress-block-ref) entries. + # The reuse_this_block flag is then kept for as long as + if record.storage_kind == 'groupcompress-block': + # Check to see if we really want to re-use this block + insert_manager = record._manager + reuse_this_block = insert_manager.check_is_well_utilized() + else: + reuse_this_block = False + if reuse_this_block: + # We still want to reuse this block if record.storage_kind == 'groupcompress-block': # Insert the raw block into the target repo insert_manager = record._manager - insert_manager._check_rebuild_block() bytes = record._manager._block.to_bytes() _, start, length = self._access.add_raw_records( [(None, len(bytes))], bytes)[0] @@ -1597,6 +1686,11 @@ 'groupcompress-block-ref'): if insert_manager is None: raise AssertionError('No insert_manager set') + if insert_manager is not record._manager: + raise AssertionError('insert_manager does not match' + ' the current record, we cannot be positive' + ' that the appropriate content was inserted.' + ) value = "%d %d %d %d" % (block_start, block_length, record._start, record._end) nodes = [(record.key, value, (record.parents,))] === modified file 'bzrlib/repofmt/groupcompress_repo.py' --- bzrlib/repofmt/groupcompress_repo.py 2009-08-24 19:34:13 +0000 +++ bzrlib/repofmt/groupcompress_repo.py 2009-08-31 02:10:12 +0000 @@ -932,7 +932,7 @@ super(GroupCHKStreamSource, self).__init__(from_repository, to_format) self._revision_keys = None self._text_keys = None - # self._text_fetch_order = 'unordered' + self._text_fetch_order = 'groupcompress' self._chk_id_roots = None self._chk_p_id_roots = None @@ -949,7 +949,7 @@ p_id_roots_set = set() source_vf = self.from_repository.inventories stream = source_vf.get_record_stream(inventory_keys, - 'unordered', True) + 'groupcompress', True) for record in stream: if record.storage_kind == 'absent': if allow_absent: === modified file 'bzrlib/tests/test_groupcompress.py' --- bzrlib/tests/test_groupcompress.py 2009-09-02 18:07:58 +0000 +++ bzrlib/tests/test_groupcompress.py 2009-09-04 00:45:56 +0000 @@ -538,7 +538,7 @@ 'as-requested', False)] self.assertEqual([('b',), ('a',), ('d',), ('c',)], keys) - def test_insert_record_stream_re_uses_blocks(self): + def test_insert_record_stream_reuses_blocks(self): vf = self.make_test_vf(True, dir='source') def grouped_stream(revision_ids, first_parents=()): parents = first_parents @@ -582,8 +582,14 @@ vf2 = self.make_test_vf(True, dir='target') # ordering in 'groupcompress' order, should actually swap the groups in # the target vf, but the groups themselves should not be disturbed. - vf2.insert_record_stream(vf.get_record_stream( - [(r,) for r in 'abcdefgh'], 'groupcompress', False)) + def small_size_stream(): + for record in vf.get_record_stream([(r,) for r in 'abcdefgh'], + 'groupcompress', False): + record._manager._full_enough_block_size = \ + record._manager._block._content_length + yield record + + vf2.insert_record_stream(small_size_stream()) stream = vf2.get_record_stream([(r,) for r in 'abcdefgh'], 'groupcompress', False) vf2.writer.end() @@ -594,6 +600,44 @@ record._manager._block._z_content) self.assertEqual(8, num_records) + def test_insert_record_stream_packs_on_the_fly(self): + vf = self.make_test_vf(True, dir='source') + def grouped_stream(revision_ids, first_parents=()): + parents = first_parents + for revision_id in revision_ids: + key = (revision_id,) + record = versionedfile.FulltextContentFactory( + key, parents, None, + 'some content that is\n' + 'identical except for\n' + 'revision_id:%s\n' % (revision_id,)) + yield record + parents = (key,) + # One group, a-d + vf.insert_record_stream(grouped_stream(['a', 'b', 'c', 'd'])) + # Second group, e-h + vf.insert_record_stream(grouped_stream(['e', 'f', 'g', 'h'], + first_parents=(('d',),))) + # Now copy the blocks into another vf, and see that the + # insert_record_stream rebuilt a new block on-the-fly because of + # under-utilization + vf2 = self.make_test_vf(True, dir='target') + vf2.insert_record_stream(vf.get_record_stream( + [(r,) for r in 'abcdefgh'], 'groupcompress', False)) + stream = vf2.get_record_stream([(r,) for r in 'abcdefgh'], + 'groupcompress', False) + vf2.writer.end() + num_records = 0 + # All of the records should be recombined into a single block + block = None + for record in stream: + num_records += 1 + if block is None: + block = record._manager._block + else: + self.assertIs(block, record._manager._block) + self.assertEqual(8, num_records) + def test__insert_record_stream_no_reuse_block(self): vf = self.make_test_vf(True, dir='source') def grouped_stream(revision_ids, first_parents=()): @@ -811,15 +855,19 @@ _texts = { ('key1',): "this is a text\n" - "with a reasonable amount of compressible bytes\n", + "with a reasonable amount of compressible bytes\n" + "which can be shared between various other texts\n", ('key2',): "another text\n" - "with a reasonable amount of compressible bytes\n", + "with a reasonable amount of compressible bytes\n" + "which can be shared between various other texts\n", ('key3',): "yet another text which won't be extracted\n" - "with a reasonable amount of compressible bytes\n", + "with a reasonable amount of compressible bytes\n" + "which can be shared between various other texts\n", ('key4',): "this will be extracted\n" "but references most of its bytes from\n" "yet another text which won't be extracted\n" - "with a reasonable amount of compressible bytes\n", + "with a reasonable amount of compressible bytes\n" + "which can be shared between various other texts\n", } def make_block(self, key_to_text): """Create a GroupCompressBlock, filling it with the given texts.""" @@ -837,6 +885,13 @@ start, end = locations[key] manager.add_factory(key, (), start, end) + def make_block_and_full_manager(self, texts): + locations, block = self.make_block(texts) + manager = groupcompress._LazyGroupContentManager(block) + for key in sorted(texts): + self.add_key_to_manager(key, locations, block, manager) + return block, manager + def test_get_fulltexts(self): locations, block = self.make_block(self._texts) manager = groupcompress._LazyGroupContentManager(block) @@ -893,8 +948,8 @@ header_len = int(header_len) block_len = int(block_len) self.assertEqual('groupcompress-block', storage_kind) - self.assertEqual(33, z_header_len) - self.assertEqual(25, header_len) + self.assertEqual(34, z_header_len) + self.assertEqual(26, header_len) self.assertEqual(len(block_bytes), block_len) z_header = rest[:z_header_len] header = zlib.decompress(z_header) @@ -934,13 +989,7 @@ self.assertEqual([('key1',), ('key4',)], result_order) def test__check_rebuild_no_changes(self): - locations, block = self.make_block(self._texts) - manager = groupcompress._LazyGroupContentManager(block) - # Request all the keys, which ensures that we won't rebuild - self.add_key_to_manager(('key1',), locations, block, manager) - self.add_key_to_manager(('key2',), locations, block, manager) - self.add_key_to_manager(('key3',), locations, block, manager) - self.add_key_to_manager(('key4',), locations, block, manager) + block, manager = self.make_block_and_full_manager(self._texts) manager._check_rebuild_block() self.assertIs(block, manager._block) @@ -971,3 +1020,50 @@ self.assertEqual(('key4',), record.key) self.assertEqual(self._texts[record.key], record.get_bytes_as('fulltext')) + + def test_check_is_well_utilized_all_keys(self): + block, manager = self.make_block_and_full_manager(self._texts) + self.assertFalse(manager.check_is_well_utilized()) + # Though we can fake it by changing the recommended minimum size + manager._full_enough_block_size = block._content_length + self.assertTrue(manager.check_is_well_utilized()) + # Setting it just above causes it to fail + manager._full_enough_block_size = block._content_length + 1 + self.assertFalse(manager.check_is_well_utilized()) + # Setting the mixed-block size doesn't do anything, because the content + # is considered to not be 'mixed' + manager._full_enough_mixed_block_size = block._content_length + self.assertFalse(manager.check_is_well_utilized()) + + def test_check_is_well_utilized_mixed_keys(self): + texts = {} + f1k1 = ('f1', 'k1') + f1k2 = ('f1', 'k2') + f2k1 = ('f2', 'k1') + f2k2 = ('f2', 'k2') + texts[f1k1] = self._texts[('key1',)] + texts[f1k2] = self._texts[('key2',)] + texts[f2k1] = self._texts[('key3',)] + texts[f2k2] = self._texts[('key4',)] + block, manager = self.make_block_and_full_manager(texts) + self.assertFalse(manager.check_is_well_utilized()) + manager._full_enough_block_size = block._content_length + self.assertTrue(manager.check_is_well_utilized()) + manager._full_enough_block_size = block._content_length + 1 + self.assertFalse(manager.check_is_well_utilized()) + manager._full_enough_mixed_block_size = block._content_length + self.assertTrue(manager.check_is_well_utilized()) + + def test_check_is_well_utilized_partial_use(self): + locations, block = self.make_block(self._texts) + manager = groupcompress._LazyGroupContentManager(block) + manager._full_enough_block_size = block._content_length + self.add_key_to_manager(('key1',), locations, block, manager) + self.add_key_to_manager(('key2',), locations, block, manager) + # Just using the content from key1 and 2 is not enough to be considered + # 'complete' + self.assertFalse(manager.check_is_well_utilized()) + # However if we add key3, then we have enough, as we only require 75% + # consumption + self.add_key_to_manager(('key4',), locations, block, manager) + self.assertTrue(manager.check_is_well_utilized()) === modified file 'bzrlib/tests/test_repository.py' --- bzrlib/tests/test_repository.py 2009-09-03 04:51:46 +0000 +++ bzrlib/tests/test_repository.py 2009-09-04 00:47:07 +0000 @@ -683,6 +683,28 @@ class Test2a(tests.TestCaseWithMemoryTransport): + def test_fetch_combines_groups(self): + builder = self.make_branch_builder('source', format='2a') + builder.start_series() + builder.build_snapshot('1', None, [ + ('add', ('', 'root-id', 'directory', '')), + ('add', ('file', 'file-id', 'file', 'content\n'))]) + builder.build_snapshot('2', ['1'], [ + ('modify', ('file-id', 'content-2\n'))]) + builder.finish_series() + source = builder.get_branch() + target = self.make_repository('target', format='2a') + target.fetch(source.repository) + target.lock_read() + self.addCleanup(target.unlock) + details = target.texts._index.get_build_details( + [('file-id', '1',), ('file-id', '2',)]) + file_1_details = details[('file-id', '1')] + file_2_details = details[('file-id', '2')] + # The index, and what to read off disk, should be the same for both + # versions of the file. + self.assertEqual(file_1_details[0][:3], file_2_details[0][:3]) + def test_format_pack_compresses_True(self): repo = self.make_repository('repo', format='2a') self.assertTrue(repo._format.pack_compresses) --