To start with, I think your layering here is pretty nice. Adding an object does make managing the state a bit clearer. get_record_stream() is complex enough that it probably should have started out as an object interface rather than just a simple generator. Oh well. In short, I think we have a race condition because of how LRUSizeCache (or just LRUCache) interacts with _get_blocks() being in arbitrary ordering. And we need a different caching algorithm that can ensure requested blocks are never flushed from the cache before they are no longer needed. The comment here is no longer correct: + for read_memo in read_memos: + try: + yield cached[read_memo] + except KeyError: + # read the group + zdata = raw_records.next() + # decompress - whole thing - this is not a bug, as it + # permits caching. We might want to store the partially + # decompresed group and decompress object, so that recent + # texts are not penalised by big groups. + block = GroupCompressBlock.from_bytes(zdata) + self._group_cache[read_memo] = block + cached[read_memo] = block + yield block ^- We are caching the 'block' and not the raw content anymore. And the block may only partially evaluate the compressed content. However, I'm more concerned that in this loop: + for read_memo in read_memos: + if read_memo in cached: + # Don't fetch what we already have + continue + if read_memo in not_cached_seen: + # Don't try to fetch the same data twice + continue + not_cached.append(read_memo) + not_cached_seen.add(read_memo) + raw_records = self._access.get_raw_records(not_cached) + for read_memo in read_memos: + try: + yield cached[read_memo] + except KeyError: + # read the group + zdata = raw_records.next() + # decompress - whole thing - this is not a bug, as it + # permits caching. We might want to store the partially + # decompresed group and decompress object, so that recent + # texts are not penalised by big groups. + block = GroupCompressBlock.from_bytes(zdata) + self._group_cache[read_memo] = block + cached[read_memo] = block + yield block ^- There is an assumption that raw_records is in the exact ordering of 'read_memos'. And I think there is another small assumption in here, which is that the code that filters out duplicates is going to require that groups always perfectly fit in cache and aren't expired before you get to the duplicate. Perhaps an example: Assume we have the groups G1=>G4, and that we have the texts G1,T1, etc. If the request ends up being for: [G1,T1], [G2, T1-T100], [G3, T1-T100], [G1,T2] The code above will not request G1 two times. However, it will cache[G2] and cache[G3], which gives it time for G1 to be flushed from the cache. Even more worrisome, is "large groups" which may never get put into the cache in the first place. (LRUSizeCache says "if a request is larger than my size, don't cache it".) I think we'll never run into these bugs in test data, but we'll see them 'in the wild' once we have data that may not fit well in the cache. So I think what we really need is a different caching logic. Namely that "_get_blocks()" could keep a counter for how many times it needs a given block, and doesn't flush the block out of the cache until that value is satisfied. In addition, it is probably reasonable to continue to cache things in gcvf._cache. I'm not sure about the LRU effects, and whether we should always make a request on the _cache to keep it in sync with the actual requests... Then again, I think the "batching cache" is really all the caching we really need. So I would be tempted to make that the only cache, and see what impact it has. (Do we need caching that persists between get_record_stream calls? We might, but things like CHKMap has its own cache...) Another possibility would be to use the cache as-is, but a cache miss re-requests the block. However, I don't think we can make a new request while we have a read pending, so that is probably much more complex than just changing the caching logic. + # Batch up as many keys as we can until either: + # - we encounter an unadded ref, or + # - we run out of keys, or + # - the total bytes to retrieve for this batch > 256k + batcher = _BatchingBlockFetcher(self, locations) + BATCH_SIZE = 2**18 ^- BATCH_SIZE should at a minimum be a global. I'm not sure why you chose 256 vs 64 vs... nor why you don't use something like transport.recommended_page_size() (4k locally, 64k remote). going further, though. Is that we aren't paying attention to the 'index' value of the groups when we consider how we want to batch. Specifically, you don't gain anything by batching index1 with index2, because that requires reading 2 files, which requires 2 round trips. Note that get_raw_records() does split things yet again into index-based batches. Now I suppose if my patch for bug #402645 goes through, then we will be fetching 'unordered' and the _get_io_ordered_source_keys will group things by index, and that should help a bit there. + if batcher.total_bytes > BATCH_SIZE: + # Ok, this batch is big enough. Yield some results. + for _ in batcher.yield_factories(): + yield _ ^- I personally don't like to see "_" as a variable that ever gets used on the right hand side. I'd prefer: for factory in batcher.yield_factories(): yield factory I would like to see tests added as part of this. Possibly direct tests of _BatchingBlockFetcher Ideally we would have some tests that we do, in fact, read in big batches so that we don't accidentally regress. It is something that isn't data-wise incorrect, but can have a large impact on performance for certain users. So it is something that is a bit hard to detect, and requires someone going "something doesn't seem right". That said it is probably pretty hard to test accurately, and it would be a shame to miss out on the benefit for a long time while waiting for testing. Why not have "add_key()" return the number of total bytes so far. so this becomes: + batcher.add_key(key) + if batcher.total_bytes > BATCH_SIZE: + # Ok, this batch is big enough. Yield some results. + for _ in batcher.yield_factories(): + yield _ if batcher.add_key(key) > BATCH_SIZE: ... I'm just slightly hesitant about looking at object's attributes rather than function return values. Perhaps just overly so. I don't really like the idea that we are maintaining all the various read orders in various length lists that have to be kept in sync. What I mean is that you have self.keys which is a list with an entry for every key memos_to_get which is a list that only contains a new entry when there is a transition not_cached a list similar to memos_to_get, but only contains unique items And then stuff like: blocks = _get_blocks(memos_to_get) which then assumes that the blocks can be blocks.next() with 100% fidelity. I realize this is true, but it seems too easy to get skew. And if you end up off-by-one, the data stream is corrupted and you don't notice. Maybe if we just check that block = blocks.next(); block.location == read_memo ? That might be sufficient. Just some sort of check that everything is still in lock-step as we expect it to be. Or even: block_read_memo, block = blocks.next() assert block_read_memo == read_memo As I don't think blocks actually have a place to store where they were read from, but we should trivially have the read memo when we are yielding the block.