Merge lp:~jderose/dmedia/fix-1247530 into lp:dmedia
- fix-1247530
- Merge into trunk
Status: | Merged |
---|---|
Merged at revision: | 760 |
Proposed branch: | lp:~jderose/dmedia/fix-1247530 |
Merge into: | lp:dmedia |
Diff against target: |
1868 lines (+1306/-230) 8 files modified
dmedia/core.py (+220/-55) dmedia/local.py (+9/-9) dmedia/metastore.py (+219/-53) dmedia/tests/test_core.py (+144/-0) dmedia/tests/test_local.py (+37/-0) dmedia/tests/test_metastore.py (+490/-112) dmedia/tests/test_views.py (+149/-0) dmedia/views.py (+38/-1) |
To merge this branch: | bzr merge lp:~jderose/dmedia/fix-1247530 |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
David Jordan | Approve | ||
Review via email: mp+196045@code.launchpad.net |
Commit message
Description of the change
For background, please see this bug:
https:/
Changes include:
* Replace core._vigilance
* The copy-increasing behavior (Vigilance) now uses an IO cost accounting model and will take the least expensive route (IO wise) in order to get all files at rank=1 up to rank=2, then all files at rank=2 up to rank=3, and so on; as a result, Dmedia now converges much more quickly; the biggest gain here is because now Vigilance will simply verify a downgraded file, whereas before it would create a new copy (verifying is the cheapest option in the IO cost accounting model)
* When downloading from a local peer, Vigilance now uses the dmedia/machine docs to determine what FileStore are connected to each peer, and thereby which should have a particular file; this is much more efficient than the previous approach, which was to make a HEAD request to each peer till the file was found; this likewise means Dmedia now converges more quickly when you have multiple devices in your Dmedia library (especially in the case of 3 or more peers).
* LocalStores.
* In the metastore module, add MIN_BYTES_FREE constant (which replaces MIN_FREE_SPACE), and rename RECLAIM_BYTES to clearer and more consistent MAX_BYTES_FREE; add unit tests for these constants, in particular to enforce that (MAX_BYTES_FREE >= 2 * MIN_BYTES_FREE)
* Vigilance now does preemptive copy-increasing up to MAX_BYTES_FREE: in other words, a 4th copy will be speculatively created, but only up to a larger min available space threshold; copies are created in descending order of atime; in tandem with MetaStore.
* Replace MetaStore.
* Add get_rank() function, which MetaStore.
* Add MetaStore.
* Replace MetaStore.
* Add MetaStore.
* Add get_copies() function, which MetaStore.
* Add new file/preempt view to drive the preemptive copy-increasing
Preview Diff
1 | === modified file 'dmedia/core.py' |
2 | --- dmedia/core.py 2013-10-24 02:59:18 +0000 |
3 | +++ dmedia/core.py 2013-11-21 03:22:50 +0000 |
4 | @@ -53,7 +53,8 @@ |
5 | from dmedia import util, schema, views |
6 | from dmedia.client import Downloader, get_client, build_ssl_context |
7 | from dmedia.metastore import MetaStore, create_stored, get_dict |
8 | -from dmedia.local import LocalStores, FileNotLocal, MIN_FREE_SPACE |
9 | +from dmedia.metastore import MIN_BYTES_FREE, MAX_BYTES_FREE |
10 | +from dmedia.local import LocalStores, FileNotLocal |
11 | |
12 | |
13 | log = logging.getLogger() |
14 | @@ -230,65 +231,221 @@ |
15 | log.exception('Error updating project stats for %r', project_id) |
16 | |
17 | |
18 | -def _vigilance_worker(env, ssl_config): |
19 | - """ |
20 | - Run the event-based copy-increasing loop to maintain file durability. |
21 | - """ |
22 | - db = util.get_db(env) |
23 | - ms = MetaStore(db) |
24 | - |
25 | - local_stores = ms.get_local_stores() |
26 | - if len(local_stores) == 0: |
27 | - log.warning('No connected local stores, cannot increase copies') |
28 | - return |
29 | - connected = frozenset(local_stores.ids) |
30 | - log.info('Connected %r', connected) |
31 | - |
32 | - clients = [] |
33 | - peers = ms.get_local_peers() |
34 | - if peers: |
35 | - ssl_context = build_ssl_context(ssl_config) |
36 | - for (peer_id, info) in peers.items(): |
37 | - url = info['url'] |
38 | - log.info('Peer %s at %s', peer_id, url) |
39 | - clients.append(get_client(url, ssl_context)) |
40 | - else: |
41 | - log.info('No known peers on local network') |
42 | - |
43 | - for (doc, stored) in ms.iter_actionable_fragile(connected, True): |
44 | +def get_downgraded(doc): |
45 | + downgraded = [] |
46 | + for (key, value) in doc['stored'].items(): |
47 | + copies = value['copies'] |
48 | + verified = value.get('verified') |
49 | + if copies == 0 and not isinstance(verified, int): |
50 | + downgraded.append(key) |
51 | + return downgraded |
52 | + |
53 | + |
54 | +class Vigilance: |
55 | + def __init__(self, ms, ssl_config): |
56 | + self.ms = ms |
57 | + self.stores = ms.get_local_stores() |
58 | + for fs in self.stores: |
59 | + log.info('Vigilance: local store: %r', fs) |
60 | + self.local = frozenset(self.stores.ids) |
61 | + self.clients = {} |
62 | + self.store_to_peer = {} |
63 | + remote = [] |
64 | + peers = ms.get_local_peers() |
65 | + if peers: |
66 | + ssl_context = build_ssl_context(ssl_config) |
67 | + for (peer_id, info) in peers.items(): |
68 | + url = info['url'] |
69 | + log.info('Vigilance: peer %s at %s', peer_id, url) |
70 | + self.clients[peer_id] = get_client(url, ssl_context) |
71 | + for doc in ms.db.get_many(list(peers)): |
72 | + if doc is not None: |
73 | + for store_id in get_dict(doc, 'stores'): |
74 | + if is_store_id(store_id): |
75 | + remote.append(store_id) |
76 | + self.store_to_peer[store_id] = doc['_id'] |
77 | + assert doc['_id'] in peers |
78 | + self.remote = frozenset(remote) |
79 | + |
80 | + def run(self): |
81 | + last_seq = self.process_backlog() |
82 | + log.info('Vigilance: processed backlog as of %r', last_seq) |
83 | + self.process_preempt() |
84 | + self.run_event_loop(last_seq) |
85 | + |
86 | + def process_backlog(self): |
87 | + for doc in self.ms.iter_fragile_files(): |
88 | + self.wrap_up_rank(doc) |
89 | + return self.ms.db.get()['update_seq'] |
90 | + |
91 | + def process_preempt(self): |
92 | + for doc in self.ms.iter_preempt_files(): |
93 | + self.wrap_up_rank(doc, threshold=MAX_BYTES_FREE) |
94 | + |
95 | + def run_event_loop(self, last_seq): |
96 | + log.info('Vigilance: starting event loop at %d', last_seq) |
97 | + while True: |
98 | + result = self.ms.wait_for_fragile_files(last_seq) |
99 | + last_seq = result['last_seq'] |
100 | + for row in result['results']: |
101 | + self.wrap_up_rank(row['doc']) |
102 | + |
103 | + def wrap_up_rank(self, doc, threshold=MIN_BYTES_FREE): |
104 | + try: |
105 | + return self.up_rank(doc, threshold) |
106 | + except Exception: |
107 | + log.exception('Error calling Vigilance.up_rank() for %r', doc) |
108 | + |
109 | + def up_rank(self, doc, threshold): |
110 | + """ |
111 | + Implements the rank-increasing decision tree. |
112 | + |
113 | + There are 4 possible actions: |
114 | + |
115 | + 1) Verify a local copy currently in a downgraded state |
116 | + |
117 | + 2) Copy from a local FileStore to another local FileStore |
118 | + |
119 | + 3) Download from a remote peer to a local FileStore |
120 | + |
121 | + 4) Do nothing as no rank-increasing action is possible |
122 | + |
123 | + This is a high-level tree based on set operations. The action taken |
124 | + here may not actually be possible because this method doesn't consider |
125 | + whether there is a local FileStore with enough available space, which |
126 | + when there isn't, actions (2) and (3) wont be possible. |
127 | + |
128 | + We use a simple IO cost accounting model: reading a copy costs one unit, |
129 | + and writing copy likewise costs one unit. For example, consider these |
130 | + three operations: |
131 | + |
132 | + ==== ============================================== |
133 | + Cost Action |
134 | + ==== ============================================== |
135 | + 1 Verify a copy (1 read unit) |
136 | + 2 Create a copy (1 read unit, 1 write unit) |
137 | + 3 Create two copies (1 read unit, 2 write units) |
138 | + ==== ============================================== |
139 | + |
140 | + This method will take the least expensive route (in IO cost units) that |
141 | + will increase the file rank by at least 1. |
142 | + |
143 | + It's tempting to look for actions with a lower cost to benefit ratio, |
144 | + even when the cost is higher. For example, consider these actions: |
145 | + |
146 | + ==== ===== ===== ======================== |
147 | + Cost +Rank Ratio Action |
148 | + ==== ===== ===== ======================== |
149 | + 1 1 1.00 Verify a downgraded copy |
150 | + 2 2 1.00 Create a copy |
151 | + 3 4 0.75 Create two copies |
152 | + ==== ===== ===== ======================== |
153 | + |
154 | + In this sense, it's a better deal to create two new copies (which is the |
155 | + action Dmedia formerly would take). However, because greater IO |
156 | + resources are consumed, this means it will necessarily delay acting on |
157 | + other equally fragile files (other files at the current rank being |
158 | + processed). |
159 | + |
160 | + Dmedia will now take the cheapest route to getting all files at rank=1 |
161 | + up to at least rank=2, then getting all files at rank=2 up to at least |
162 | + rank=3, and so on. |
163 | + |
164 | + Another interesting "good deal" is creating new copies by reading from |
165 | + a local downgraded copy (because the source file is always verified as |
166 | + its read): |
167 | + |
168 | + ==== ===== ===== ======================================== |
169 | + Cost +Rank Ratio Action |
170 | + ==== ===== ===== ======================================== |
171 | + 1 1 1.00 Verify a downgraded copy |
172 | + 2 2 1.00 Create a copy |
173 | + 2 3 0.66 Create a copy from a downgraded copy |
174 | + 3 4 0.75 Create two copies |
175 | + 3 5 0.60 Create two copies from a downgraded copy |
176 | + ==== ===== ===== ======================================== |
177 | + |
178 | + One place where this does make sense is when there is a locally |
179 | + available file at rank=1 (a single physical copy in a downgraded state), |
180 | + and a locally connected FileStore with enough free space to create a new |
181 | + copy. As a state of having only a single physical copy is so dangerous, |
182 | + it makes sense to bend the rules here. |
183 | + |
184 | + FIXME: Dmedia doesn't yet do this! Probably the best way to implement |
185 | + this is for the decision tree here to work as it does, but to add some |
186 | + special case handling in Vigilance.up_rank_by_verifying(). Assuming the |
187 | + needed space isn't available on another FileStore, we should still at |
188 | + least verify the copy. |
189 | + |
190 | + However, the same will not be done for a file at rank=3 (two physical |
191 | + copies, one in a downgraded state). In this case the downgraded copy |
192 | + will simply be verified, using 1 IO unit and increasing the rank to 4. |
193 | + |
194 | + Note that we use the same cost for a read whether reading from a local |
195 | + drive or downloading from a peer. Although downloading is cheaper when |
196 | + looked at only from the perspective of the local node, it has the same |
197 | + cost when considering the total Dmedia library. |
198 | + |
199 | + The other peers will likewise be doing their best to address any fragile |
200 | + files. And furthermore, local network IO is generally a more scarce |
201 | + resource (especially over WiFi), so we should only download when its |
202 | + absolutely needed (ie, when no local copy is available). |
203 | + """ |
204 | + assert isinstance(threshold, int) and threshold > 0 |
205 | + stored = set(doc['stored']) |
206 | + local = stored.intersection(self.local) |
207 | + downgraded = local.intersection(get_downgraded(doc)) |
208 | + free = self.local - stored |
209 | + remote = stored.intersection(self.remote) |
210 | + if local: |
211 | + if downgraded: |
212 | + return self.up_rank_by_verifying(doc, downgraded) |
213 | + elif free: |
214 | + return self.up_rank_by_copying(doc, free, threshold) |
215 | + elif remote: |
216 | + return self.up_rank_by_downloading(doc, remote, threshold) |
217 | + |
218 | + def up_rank_by_verifying(self, doc, downgraded): |
219 | + assert isinstance(downgraded, set) |
220 | + store_id = downgraded.pop() |
221 | + fs = self.stores.by_id(store_id) |
222 | + return self.ms.verify(fs, doc) |
223 | + |
224 | + def up_rank_by_copying(self, doc, free, threshold): |
225 | + dst = self.stores.filter_by_avail(free, doc['bytes'], 1, threshold) |
226 | + if dst: |
227 | + src = self.stores.choose_local_store(doc) |
228 | + return self.ms.copy(src, doc, *dst) |
229 | + |
230 | + def up_rank_by_downloading(self, doc, remote, threshold): |
231 | + fs = self.stores.find_dst_store(doc['bytes'], threshold) |
232 | + if fs is None: |
233 | + return |
234 | + peer_ids = frozenset( |
235 | + self.store_to_peer[store_id] for store_id in remote |
236 | + ) |
237 | + downloader = None |
238 | _id = doc['_id'] |
239 | - copies = sum(v['copies'] for v in doc['stored'].values()) |
240 | - if copies >= 3: |
241 | - log.warning('%s already has copies >= 3, skipping', _id) |
242 | - continue |
243 | - size = doc['bytes'] |
244 | - local = connected.intersection(stored) # Any local copies? |
245 | - if local: |
246 | - free = connected - stored |
247 | - src = local_stores.choose_local_store(doc) |
248 | - dst = local_stores.filter_by_avail(free, size, 3 - copies) |
249 | - if dst: |
250 | - ms.copy(src, doc, *dst) |
251 | - elif clients: |
252 | - fs = local_stores.find_dst_store(size) |
253 | - if fs is None: |
254 | - log.warning( |
255 | - 'No FileStore with avail space to download %s', _id |
256 | - ) |
257 | + for peer_id in peer_ids: |
258 | + client = self.clients[peer_id] |
259 | + if not client.has_file(_id): |
260 | continue |
261 | - for client in clients: |
262 | - if not client.has_file(_id): |
263 | - continue |
264 | - downloader = Downloader(doc, ms, fs) |
265 | - try: |
266 | - downloader.download_from(client) |
267 | - except Exception: |
268 | - log.exception('Error downloading %s from %s', _id, client) |
269 | + if downloader is None: |
270 | + downloader = Downloader(doc, self.ms, fs) |
271 | + try: |
272 | + downloader.download_from(client) |
273 | + except Exception: |
274 | + log.exception('Error downloading %s from %s', _id, client) |
275 | + if downloader.download_is_complete(): |
276 | + return downloader.doc |
277 | |
278 | |
279 | def vigilance_worker(env, ssl_config): |
280 | try: |
281 | - _vigilance_worker(env, ssl_config) |
282 | + db = util.get_db(env) |
283 | + ms = MetaStore(db) |
284 | + vigilance = Vigilance(ms, ssl_config) |
285 | + vigilance.run() |
286 | except Exception: |
287 | log.exception('Error in vigilance_worker():') |
288 | |
289 | @@ -328,7 +485,11 @@ |
290 | |
291 | |
292 | def is_file_id(_id): |
293 | - return isdb32(_id) and len(_id) == 48 |
294 | + return isinstance(_id, str) and len(_id) == 48 and isdb32(_id) |
295 | + |
296 | + |
297 | +def is_store_id(_id): |
298 | + return isinstance(_id, str) and len(_id) == 24 and isdb32(_id) |
299 | |
300 | |
301 | def clean_file_id(_id): |
302 | @@ -531,6 +692,10 @@ |
303 | self.task_manager.requeue_filestore_tasks(tuple(self.stores)) |
304 | |
305 | def restart_vigilance(self): |
306 | + # FIXME: Core should also restart Vigilance whenever the FileStore |
307 | + # connected to a peer change. We should do this by monitoring the |
308 | + # _changes feed for changes to any of the machine docs corresponding to |
309 | + # the currently visible local peers. |
310 | self.task_manager.restart_vigilance() |
311 | |
312 | def get_auto_format(self): |
313 | |
314 | === modified file 'dmedia/local.py' |
315 | --- dmedia/local.py 2013-10-24 02:21:03 +0000 |
316 | +++ dmedia/local.py 2013-11-21 03:22:50 +0000 |
317 | @@ -33,7 +33,6 @@ |
318 | |
319 | |
320 | log = logging.getLogger() |
321 | -MIN_FREE_SPACE = 16 * 1024**3 # 8 GiB min free space |
322 | |
323 | |
324 | class NoSuchFile(Exception): |
325 | @@ -170,24 +169,25 @@ |
326 | reverse=reverse, |
327 | ) |
328 | |
329 | - def filter_by_avail(self, free_set, size, copies_needed): |
330 | - assert isinstance(size, int) and size >= 1 |
331 | - assert isinstance(copies_needed, int) and 1 <= copies_needed <= 3 |
332 | + def filter_by_avail(self, free, size, copies, threshold): |
333 | + assert isinstance(size, int) and size > 0 |
334 | + assert isinstance(copies, int) and 1 <= copies <= 3 |
335 | + assert isinstance(threshold, int) and threshold > 0 |
336 | stores = [] |
337 | - required_avail = size + MIN_FREE_SPACE |
338 | + required_avail = size + threshold |
339 | for fs in self.sort_by_avail(): |
340 | - if fs.id in free_set and fs.statvfs().avail >= required_avail: |
341 | + if fs.id in free and fs.statvfs().avail >= required_avail: |
342 | stores.append(fs) |
343 | - if len(stores) >= copies_needed: |
344 | + if len(stores) >= copies: |
345 | break |
346 | return stores |
347 | |
348 | - def find_dst_store(self, size): |
349 | + def find_dst_store(self, size, threshold): |
350 | stores = self.sort_by_avail() |
351 | if not stores: |
352 | return |
353 | fs = stores[0] |
354 | - if fs.statvfs().avail >= size + MIN_FREE_SPACE: |
355 | + if fs.statvfs().avail >= size + threshold: |
356 | return fs |
357 | |
358 | def local_stores(self): |
359 | |
360 | === modified file 'dmedia/metastore.py' |
361 | --- dmedia/metastore.py 2013-10-30 03:26:10 +0000 |
362 | +++ dmedia/metastore.py 2013-11-21 03:22:50 +0000 |
363 | @@ -67,7 +67,7 @@ |
364 | from random import SystemRandom |
365 | from copy import deepcopy |
366 | |
367 | -from dbase32 import log_id |
368 | +from dbase32 import log_id, isdb32 |
369 | from filestore import FileStore, CorruptFile, FileNotFound, check_root_hash |
370 | from microfiber import NotFound, Conflict, BadRequest, BulkConflict |
371 | from microfiber import id_slice_iter, dumps |
372 | @@ -90,9 +90,9 @@ |
373 | VERIFY_BY_MTIME = DOWNGRADE_BY_MTIME // 8 |
374 | VERIFY_BY_VERIFIED = DOWNGRADE_BY_VERIFIED // 2 |
375 | |
376 | -GiB = 1024**3 |
377 | -RECLAIM_BYTES = 64 * GiB |
378 | - |
379 | +GB = 1000000000 |
380 | +MIN_BYTES_FREE = 16 * GB |
381 | +MAX_BYTES_FREE = 64 * GB |
382 | |
383 | |
384 | class TimeDelta: |
385 | @@ -140,6 +140,139 @@ |
386 | return d[key] |
387 | |
388 | |
389 | +def get_int(d, key): |
390 | + """ |
391 | + Force value for *key* in *d* to be an ``int`` >= 0. |
392 | + |
393 | + For example: |
394 | + |
395 | + >>> doc = {'foo': 'BAR'} |
396 | + >>> get_int(doc, 'foo') |
397 | + 0 |
398 | + >>> doc |
399 | + {'foo': 0} |
400 | + |
401 | + """ |
402 | + if not isinstance(d, dict): |
403 | + raise TypeError(TYPE_ERROR.format('d', dict, type(d), d)) |
404 | + if not isinstance(key, str): |
405 | + raise TypeError(TYPE_ERROR.format('key', str, type(key), key)) |
406 | + value = d.get(key) |
407 | + if isinstance(value, int) and value >= 0: |
408 | + return value |
409 | + d[key] = 0 |
410 | + return d[key] |
411 | + |
412 | + |
413 | +def get_rank(doc): |
414 | + """ |
415 | + Calculate the rank of the file represented by *doc*. |
416 | + |
417 | + The rank of a file is the number of physical drives its assumed to be stored |
418 | + upon plus the sum of the assumed durability of those copies, basically:: |
419 | + |
420 | + rank = len(doc['stored']) + sum(v['copies'] for v in doc['stored'].values()) |
421 | + |
422 | + However, this function can cope with an arbitrarily broken *doc*, as long as |
423 | + *doc* is at least a ``dict`` instance. For example: |
424 | + |
425 | + >>> doc = { |
426 | + ... 'stored': { |
427 | + ... '333333333333333333333333': {'copies': 1}, |
428 | + ... '999999999999999999999999': {'copies': -6}, |
429 | + ... 'AAAAAAAAAAAAAAAAAAAAAAAA': 'junk', |
430 | + ... 'YYYYYYYYYYYYYYYY': 'store_id too short', |
431 | + ... 42: 'the ultimate key to the ultimate value', |
432 | + ... }, |
433 | + ... } |
434 | + >>> get_rank(doc) |
435 | + 4 |
436 | + |
437 | + Any needed schema coercion is done in-place: |
438 | + |
439 | + >>> doc == { |
440 | + ... 'stored': { |
441 | + ... '333333333333333333333333': {'copies': 1}, |
442 | + ... '999999999999999999999999': {'copies': 0}, |
443 | + ... 'AAAAAAAAAAAAAAAAAAAAAAAA': {'copies': 0}, |
444 | + ... }, |
445 | + ... } |
446 | + True |
447 | + |
448 | + It even works with an empty doc: |
449 | + |
450 | + >>> doc = {} |
451 | + >>> get_rank(doc) |
452 | + 0 |
453 | + >>> doc |
454 | + {'stored': {}} |
455 | + |
456 | + The rank of a file is used to order (prioritize) the copy increasing |
457 | + behavior, which is done from lowest rank to highest rank (from most fragile |
458 | + to least fragile). |
459 | + |
460 | + Also see the "file/rank" CouchDB view function in `dmedia.views`. |
461 | + """ |
462 | + stored = get_dict(doc, 'stored') |
463 | + copies = 0 |
464 | + for key in tuple(stored): |
465 | + if isinstance(key, str) and len(key) == 24 and isdb32(key): |
466 | + value = get_dict(stored, key) |
467 | + copies += get_int(value, 'copies') |
468 | + else: |
469 | + del stored[key] |
470 | + return min(3, len(stored)) + min(3, copies) |
471 | + |
472 | + |
473 | +def get_copies(doc): |
474 | + """ |
475 | + Calculate the durability of the file represented by *doc*. |
476 | + |
477 | + For example: |
478 | + |
479 | + >>> doc = { |
480 | + ... 'stored': { |
481 | + ... '333333333333333333333333': {'copies': 1}, |
482 | + ... '999999999999999999999999': {'copies': -6}, |
483 | + ... 'AAAAAAAAAAAAAAAAAAAAAAAA': 'junk', |
484 | + ... 'YYYYYYYYYYYYYYYY': 'store_id too short', |
485 | + ... 42: 'the ultimate key to the ultimate value', |
486 | + ... }, |
487 | + ... } |
488 | + >>> get_copies(doc) |
489 | + 1 |
490 | + |
491 | + Any needed schema coercion is done in-place: |
492 | + |
493 | + >>> doc == { |
494 | + ... 'stored': { |
495 | + ... '333333333333333333333333': {'copies': 1}, |
496 | + ... '999999999999999999999999': {'copies': 0}, |
497 | + ... 'AAAAAAAAAAAAAAAAAAAAAAAA': {'copies': 0}, |
498 | + ... }, |
499 | + ... } |
500 | + True |
501 | + |
502 | + It even works with an empty doc: |
503 | + |
504 | + >>> doc = {} |
505 | + >>> get_copies(doc) |
506 | + 0 |
507 | + >>> doc |
508 | + {'stored': {}} |
509 | + |
510 | + """ |
511 | + stored = get_dict(doc, 'stored') |
512 | + copies = 0 |
513 | + for key in tuple(stored): |
514 | + if isinstance(key, str) and len(key) == 24 and isdb32(key): |
515 | + value = get_dict(stored, key) |
516 | + copies += get_int(value, 'copies') |
517 | + else: |
518 | + del stored[key] |
519 | + return copies |
520 | + |
521 | + |
522 | def get_mtime(fs, _id): |
523 | return int(fs.stat(_id).mtime) |
524 | |
525 | @@ -564,7 +697,7 @@ |
526 | count += len(docs) |
527 | try: |
528 | self.db.save_many(docs) |
529 | - except BulkConflict: |
530 | + except BulkConflict as e: |
531 | log.exception('Conflict purging %s', store_id) |
532 | count -= len(e.conflicts) |
533 | try: |
534 | @@ -598,21 +731,21 @@ |
535 | |
536 | A fundamental design tenet of Dmedia is that it doesn't particularly |
537 | trust its metadata, and instead does frequent reality checks. This |
538 | - allows Dmedia to work even though removable storage is constantly |
539 | - "offline". In other distributed file-systems, this is usually called |
540 | - being in a "network-partitioned" state. |
541 | + allows Dmedia to work even though removable storage is often offline, |
542 | + meaning the overall Dmedia library is often in a network-partitioned |
543 | + state even when all the peers in the library might be online. |
544 | |
545 | Dmedia deals with removable storage via a quickly decaying confidence |
546 | in its metadata. If a removable drive hasn't been connected longer |
547 | than some threshold, Dmedia will update all those copies to count for |
548 | zero durability. |
549 | |
550 | - And whenever a removable drive (on any drive for that matter) is |
551 | - connected, Dmedia immediately checks to see what files are actually on |
552 | - the drive, and whether they have good integrity. |
553 | + Whenever a removable drive (or any drive for that matter) is connected, |
554 | + Dmedia immediately checks to see what files are actually on the drive, |
555 | + and whether they have good integrity. |
556 | |
557 | `MetaStore.scan()` is the most important reality check that Dmedia does |
558 | - because it's fast and can therefor be done quite often. Thousands of |
559 | + because it's fast and can therefor be done frequently. Thousands of |
560 | files can be scanned in a few seconds. |
561 | |
562 | The scan insures that for every file expected in this file-store, the |
563 | @@ -625,12 +758,13 @@ |
564 | the file-store. Then the doc is updated accordingly marking the file as |
565 | being corrupt in this file-store, and the doc is saved. |
566 | |
567 | - If the file doesn't have the expected mtime is this file-store, this |
568 | + If the file doesn't have the expected mtime in this file-store, this |
569 | copy gets downgraded to zero copies worth of durability, and the last |
570 | verification timestamp is deleted, if present. This will put the file |
571 | first in line for full content-hash verification. If the verification |
572 | - passes, the durability is raised back to the appropriate number of |
573 | - copies. |
574 | + passes, the durability will be raised back to the appropriate number of |
575 | + copies (although note this is done by `MetaStore.verify_by_downgraded()`, |
576 | + not by this method). |
577 | |
578 | :param fs: a `FileStore` instance |
579 | """ |
580 | @@ -677,6 +811,7 @@ |
581 | except NotFound: |
582 | doc = deepcopy(fs.doc) |
583 | doc['atime'] = int(time.time()) |
584 | + doc['bytes_avail'] = fs.statvfs().avail |
585 | self.db.save(doc) |
586 | t.log('scan %r files in %r', count, fs) |
587 | return count |
588 | @@ -856,37 +991,60 @@ |
589 | new = create_stored(doc['_id'], fs) |
590 | return self.db.update(mark_added, doc, new) |
591 | |
592 | - def iter_fragile(self, monitor=False): |
593 | - """ |
594 | - Yield doc for each fragile file. |
595 | - """ |
596 | - for rank in range(6): |
597 | - result = self.db.view('file', 'rank', key=rank, update_seq=True) |
598 | - update_seq = result.get('update_seq') |
599 | - ids = [row['id'] for row in result['rows']] |
600 | - del result # result might be quite large, free some memory |
601 | + def iter_files_at_rank(self, rank): |
602 | + if not isinstance(rank, int): |
603 | + raise TypeError(TYPE_ERROR.format('rank', int, type(rank), rank)) |
604 | + if not (0 <= rank <= 5): |
605 | + raise ValueError('Need 0 <= rank <= 5; got {}'.format(rank)) |
606 | + LIMIT = 50 |
607 | + kw = { |
608 | + 'limit': LIMIT, |
609 | + 'key': rank, |
610 | + } |
611 | + while True: |
612 | + rows = self.db.view('file', 'rank', **kw)['rows'] |
613 | + if not rows: |
614 | + break |
615 | + ids = [r['id'] for r in rows] |
616 | + if ids[0] == kw.get('startkey_docid'): |
617 | + ids.pop(0) |
618 | + if not ids: |
619 | + break |
620 | + log.info('Considering %d files at rank=%d starting at %s', |
621 | + len(ids), rank, ids[0] |
622 | + ) |
623 | random.shuffle(ids) |
624 | - log.info('vigilance: %d files at rank=%d', len(ids), rank) |
625 | for _id in ids: |
626 | - yield self.db.get(_id) |
627 | - if not monitor: |
628 | - return |
629 | - |
630 | - # Now we enter an event-based loop using the _changes feed: |
631 | - if update_seq is None: |
632 | - update_seq = self.db.get()['update_seq'] |
633 | + try: |
634 | + doc = self.db.get(_id) |
635 | + doc_rank = get_rank(doc) |
636 | + if doc_rank <= rank: |
637 | + yield doc |
638 | + else: |
639 | + log.info('Now at rank %d > %d, skipping %s', |
640 | + doc_rank, rank, doc.get('_id') |
641 | + ) |
642 | + except NotFound: |
643 | + log.warning('doc NotFound for %s at rank=%d', _id, rank) |
644 | + if len(rows) < LIMIT: |
645 | + break |
646 | + kw['startkey_docid'] = rows[-1]['id'] |
647 | + |
648 | + def iter_fragile_files(self): |
649 | + for rank in range(6): |
650 | + for doc in self.iter_files_at_rank(rank): |
651 | + yield doc |
652 | + |
653 | + def wait_for_fragile_files(self, last_seq): |
654 | kw = { |
655 | 'feed': 'longpoll', |
656 | 'include_docs': True, |
657 | 'filter': 'file/fragile', |
658 | - 'since': update_seq, |
659 | + 'since': last_seq, |
660 | } |
661 | while True: |
662 | try: |
663 | - result = self.db.get('_changes', **kw) |
664 | - for row in result['results']: |
665 | - yield row['doc'] |
666 | - kw['since'] = result['last_seq'] |
667 | + return self.db.get('_changes', **kw) |
668 | # FIXME: Sometimes we get a 400 Bad Request from CouchDB, perhaps |
669 | # when `since` gets ahead of the `update_seq` as viewed by the |
670 | # changes feed? By excepting `BadRequest` here, we prevent the |
671 | @@ -900,21 +1058,29 @@ |
672 | except (ResponseNotReady, BadRequest): |
673 | pass |
674 | |
675 | - def iter_actionable_fragile(self, connected, monitor=False): |
676 | - """ |
677 | - Yield doc for each fragile file that this node might be able to fix. |
678 | - |
679 | - To be "actionable", this machine must have at least one currently |
680 | - connected FileStore (drive) that does *not* already contain a copy of |
681 | - the fragile file. |
682 | - """ |
683 | - assert isinstance(connected, frozenset) |
684 | - for doc in self.iter_fragile(monitor): |
685 | - stored = frozenset(get_dict(doc, 'stored')) |
686 | - if (connected - stored): |
687 | - yield (doc, stored) |
688 | - |
689 | - def reclaim(self, fs, threshold=RECLAIM_BYTES): |
690 | + def iter_preempt_files(self): |
691 | + kw = { |
692 | + 'limit': 100, |
693 | + 'descending': True, |
694 | + } |
695 | + rows = self.db.view('file', 'preempt', **kw)['rows'] |
696 | + if not rows: |
697 | + return |
698 | + ids = [r['id'] for r in rows] |
699 | + log.info('Considering %d files for preemptive copy increasing', len(ids)) |
700 | + random.shuffle(ids) |
701 | + for _id in ids: |
702 | + try: |
703 | + doc = self.db.get(_id) |
704 | + copies = get_copies(doc) |
705 | + if copies == 3: |
706 | + yield doc |
707 | + else: |
708 | + log.info('Now at copies=%d, skipping %s', copies, _id) |
709 | + except NotFound: |
710 | + log.warning('preempt doc NotFound for %s', _id) |
711 | + |
712 | + def reclaim(self, fs, threshold=MAX_BYTES_FREE): |
713 | count = 0 |
714 | size = 0 |
715 | t = TimeDelta() |
716 | @@ -936,7 +1102,7 @@ |
717 | t.log('reclaim %s in %r', count_and_size(count, size), fs) |
718 | return (count, size) |
719 | |
720 | - def reclaim_all(self, threshold=RECLAIM_BYTES): |
721 | + def reclaim_all(self, threshold=MAX_BYTES_FREE): |
722 | try: |
723 | count = 0 |
724 | size = 0 |
725 | |
726 | === modified file 'dmedia/tests/test_core.py' |
727 | --- dmedia/tests/test_core.py 2013-10-24 02:59:18 +0000 |
728 | +++ dmedia/tests/test_core.py 2013-11-21 03:22:50 +0000 |
729 | @@ -230,6 +230,150 @@ |
730 | }) |
731 | |
732 | |
733 | +class TestVigilanceMocked(TestCase): |
734 | + def test_up_rank(self): |
735 | + class Mocked(core.Vigilance): |
736 | + def __init__(self, local, remote): |
737 | + self.local = frozenset(local) |
738 | + self.remote = frozenset(remote) |
739 | + self._calls = [] |
740 | + |
741 | + def up_rank_by_verifying(self, doc, downgraded): |
742 | + self._calls.extend(('verify', doc, downgraded)) |
743 | + return doc |
744 | + |
745 | + def up_rank_by_copying(self, doc, free, threshold): |
746 | + self._calls.extend(('copy', doc, free, threshold)) |
747 | + return doc |
748 | + |
749 | + def up_rank_by_downloading(self, doc, remote, threshold): |
750 | + self._calls.extend(('download', doc, remote, threshold)) |
751 | + return doc |
752 | + |
753 | + local = tuple(random_id() for i in range(2)) |
754 | + remote = tuple(random_id() for i in range(2)) |
755 | + mocked = Mocked(local, remote) |
756 | + |
757 | + # Verify, one local: |
758 | + doc = { |
759 | + 'stored': { |
760 | + local[0]: {'copies': 0}, |
761 | + }, |
762 | + } |
763 | + self.assertIs(mocked.up_rank(doc, 17), doc) |
764 | + self.assertEqual(mocked._calls, |
765 | + ['verify', doc, {local[0]}] |
766 | + ) |
767 | + |
768 | + # Verify, two local: |
769 | + doc = { |
770 | + 'stored': { |
771 | + local[0]: {'copies': 0}, |
772 | + local[1]: {'copies': 1}, |
773 | + }, |
774 | + } |
775 | + mocked._calls.clear() |
776 | + self.assertIs(mocked.up_rank(doc, 17), doc) |
777 | + self.assertEqual(mocked._calls, |
778 | + ['verify', doc, {local[0]}] |
779 | + ) |
780 | + |
781 | + # Verify, one local, one remote: |
782 | + doc = { |
783 | + 'stored': { |
784 | + local[0]: {'copies': 0}, |
785 | + remote[0]: {'copies': 1}, |
786 | + }, |
787 | + } |
788 | + mocked._calls.clear() |
789 | + self.assertIs(mocked.up_rank(doc, 17), doc) |
790 | + self.assertEqual(mocked._calls, |
791 | + ['verify', doc, {local[0]}] |
792 | + ) |
793 | + |
794 | + # Copy, one local, one remote: |
795 | + doc = { |
796 | + 'stored': { |
797 | + local[0]: {'copies': 1}, |
798 | + remote[0]: {'copies': 1}, |
799 | + }, |
800 | + } |
801 | + mocked._calls.clear() |
802 | + self.assertIs(mocked.up_rank(doc, 17), doc) |
803 | + self.assertEqual(mocked._calls, |
804 | + ['copy', doc, {local[1]}, 17] |
805 | + ) |
806 | + |
807 | + # Copy, two local, one remote: |
808 | + doc = { |
809 | + 'stored': { |
810 | + local[0]: {'copies': 1}, |
811 | + local[1]: {'copies': 1}, |
812 | + remote[0]: {'copies': 1}, |
813 | + }, |
814 | + } |
815 | + mocked._calls.clear() |
816 | + self.assertIsNone(mocked.up_rank(doc, 17)) |
817 | + self.assertEqual(mocked._calls, []) |
818 | + |
819 | + # Download, one remote: |
820 | + doc = { |
821 | + 'stored': { |
822 | + remote[0]: {'copies': 0}, |
823 | + }, |
824 | + } |
825 | + mocked._calls.clear() |
826 | + self.assertIs(mocked.up_rank(doc, 17), doc) |
827 | + self.assertEqual(mocked._calls, |
828 | + ['download', doc, {remote[0]}, 17] |
829 | + ) |
830 | + |
831 | + # Download, two remote: |
832 | + doc = { |
833 | + 'stored': { |
834 | + remote[0]: {'copies': 0}, |
835 | + remote[1]: {'copies': 1}, |
836 | + }, |
837 | + } |
838 | + mocked._calls.clear() |
839 | + self.assertIs(mocked.up_rank(doc, 17), doc) |
840 | + self.assertEqual(mocked._calls, |
841 | + ['download', doc, set(remote), 17] |
842 | + ) |
843 | + |
844 | + # Available in neither local nor remote: |
845 | + doc = { |
846 | + 'stored': { |
847 | + random_id(): {'copies': 0}, |
848 | + random_id(): {'copies': 1}, |
849 | + }, |
850 | + } |
851 | + mocked._calls.clear() |
852 | + self.assertIsNone(mocked.up_rank(doc, 17)) |
853 | + self.assertEqual(mocked._calls, []) |
854 | + |
855 | + # Empty doc['stored']: |
856 | + doc = {'stored': {}} |
857 | + mocked._calls.clear() |
858 | + self.assertIsNone(mocked.up_rank(doc, 17)) |
859 | + self.assertEqual(mocked._calls, []) |
860 | + |
861 | + |
862 | +class TestVigilance(CouchCase): |
863 | + def test_init(self): |
864 | + db = util.get_db(self.env, True) |
865 | + ms = MetaStore(db) |
866 | + inst = core.Vigilance(ms, None) |
867 | + self.assertIs(inst.ms, ms) |
868 | + self.assertIsInstance(inst.stores, LocalStores) |
869 | + self.assertIsInstance(inst.local, frozenset) |
870 | + self.assertEqual(inst.local, frozenset()) |
871 | + self.assertIsInstance(inst.remote, frozenset) |
872 | + self.assertEqual(inst.remote, frozenset()) |
873 | + self.assertEqual(inst.clients, {}) |
874 | + self.assertEqual(inst.store_to_peer, {}) |
875 | + |
876 | + |
877 | class TestTaskQueue(TestCase): |
878 | def test_init(self): |
879 | tq = core.TaskQueue() |
880 | |
881 | === modified file 'dmedia/tests/test_local.py' |
882 | --- dmedia/tests/test_local.py 2013-08-25 20:30:25 +0000 |
883 | +++ dmedia/tests/test_local.py 2013-11-21 03:22:50 +0000 |
884 | @@ -181,6 +181,43 @@ |
885 | self.assertEqual(inst.fast, set()) |
886 | self.assertEqual(inst.slow, set()) |
887 | |
888 | + def test_filter_by_avail(self): |
889 | + # FIXME: Improve this once we can mock FileStore.statvfs() |
890 | + inst = local.LocalStores() |
891 | + |
892 | + # Empty |
893 | + self.assertEqual(inst.filter_by_avail({}, 1, 1, 1), []) |
894 | + free = {random_id(), random_id()} |
895 | + self.assertEqual(inst.filter_by_avail(free, 1, 1, 1), []) |
896 | + |
897 | + # One FileStore |
898 | + fs1 = TempFileStore() |
899 | + inst.add(fs1) |
900 | + self.assertEqual(inst.filter_by_avail({}, 1, 1, 1), []) |
901 | + free = {random_id(), random_id()} |
902 | + self.assertEqual(inst.filter_by_avail(free, 1, 1, 1), []) |
903 | + free = {fs1.id, random_id(), random_id()} |
904 | + self.assertEqual(inst.filter_by_avail(free, 1, 1, 1), [fs1]) |
905 | + |
906 | + # Two FileStore |
907 | + fs2 = TempFileStore() |
908 | + inst.add(fs2) |
909 | + self.assertEqual(inst.filter_by_avail({}, 1, 1, 1), []) |
910 | + free = {random_id(), random_id()} |
911 | + self.assertEqual(inst.filter_by_avail(free, 1, 1, 1), []) |
912 | + free = {fs1.id, random_id(), random_id()} |
913 | + self.assertEqual(inst.filter_by_avail(free, 1, 1, 1), [fs1]) |
914 | + free = {fs2.id, random_id(), random_id()} |
915 | + self.assertEqual(inst.filter_by_avail(free, 1, 1, 1), [fs2]) |
916 | + |
917 | + def test_find_dst_store(self): |
918 | + # FIXME: Improve this once we can mock FileStore.statvfs() |
919 | + inst = local.LocalStores() |
920 | + self.assertIsNone(inst.find_dst_store(1, 1)) |
921 | + fs = TempFileStore() |
922 | + inst.add(fs) |
923 | + self.assertIs(inst.find_dst_store(1, 1), fs) |
924 | + |
925 | def test_local_stores(self): |
926 | fs1 = TempFileStore(copies=1) |
927 | fs2 = TempFileStore(copies=0) |
928 | |
929 | === modified file 'dmedia/tests/test_metastore.py' |
930 | --- dmedia/tests/test_metastore.py 2013-10-30 03:26:10 +0000 |
931 | +++ dmedia/tests/test_metastore.py 2013-11-21 03:22:50 +0000 |
932 | @@ -44,11 +44,91 @@ |
933 | from dmedia import util, schema, metastore |
934 | from dmedia.metastore import create_stored, get_mtime |
935 | from dmedia.constants import TYPE_ERROR |
936 | +from dmedia.units import bytes10 |
937 | |
938 | |
939 | random = SystemRandom() |
940 | |
941 | |
942 | +def doc_id(doc): |
943 | + """ |
944 | + Used as key function for sorted by doc['_id']. |
945 | + """ |
946 | + return doc['_id'] |
947 | + |
948 | + |
949 | +def build_stored_at_rank(rank, store_ids): |
950 | + """ |
951 | + Build doc['stored'] for a specific rank. |
952 | + |
953 | + For example: |
954 | + |
955 | + >>> store_ids = ( |
956 | + ... '333333333333333333333333', |
957 | + ... 'AAAAAAAAAAAAAAAAAAAAAAAA', |
958 | + ... 'YYYYYYYYYYYYYYYYYYYYYYYY', |
959 | + ... ) |
960 | + >>> build_stored_at_rank(0, store_ids) |
961 | + {} |
962 | + >>> build_stored_at_rank(1, store_ids) |
963 | + {'333333333333333333333333': {'copies': 0}} |
964 | + >>> build_stored_at_rank(2, store_ids) |
965 | + {'333333333333333333333333': {'copies': 1}} |
966 | + |
967 | + """ |
968 | + assert isinstance(rank, int) |
969 | + assert 0 <= rank <= 6 |
970 | + assert isinstance(store_ids, tuple) |
971 | + assert len(store_ids) == 3 |
972 | + for _id in store_ids: |
973 | + assert isinstance(_id, str) and len(_id) == 24 and isdb32(_id) |
974 | + if rank == 0: |
975 | + return {} |
976 | + if rank == 1: |
977 | + return { |
978 | + store_ids[0]: {'copies': 0}, |
979 | + } |
980 | + if rank == 2: |
981 | + return { |
982 | + store_ids[0]: {'copies': 1}, |
983 | + } |
984 | + if rank == 3: |
985 | + return { |
986 | + store_ids[0]: {'copies': 1}, |
987 | + store_ids[1]: {'copies': 0}, |
988 | + } |
989 | + if rank == 4: |
990 | + return { |
991 | + store_ids[0]: {'copies': 1}, |
992 | + store_ids[1]: {'copies': 1}, |
993 | + } |
994 | + if rank == 5: |
995 | + return { |
996 | + store_ids[0]: {'copies': 1}, |
997 | + store_ids[1]: {'copies': 1}, |
998 | + store_ids[2]: {'copies': 0}, |
999 | + } |
1000 | + if rank == 6: |
1001 | + return { |
1002 | + store_ids[0]: {'copies': 1}, |
1003 | + store_ids[1]: {'copies': 1}, |
1004 | + store_ids[2]: {'copies': 1}, |
1005 | + } |
1006 | + raise Exception('should not have reached this point') |
1007 | + |
1008 | + |
1009 | +def build_file_at_rank(_id, rank, store_ids): |
1010 | + assert isinstance(_id, str) and len(_id) == 48 and isdb32(_id) |
1011 | + doc = { |
1012 | + '_id': _id, |
1013 | + 'type': 'dmedia/file', |
1014 | + 'origin': 'user', |
1015 | + 'stored': build_stored_at_rank(rank, store_ids), |
1016 | + } |
1017 | + assert metastore.get_rank(doc) == rank |
1018 | + return doc |
1019 | + |
1020 | + |
1021 | def create_random_file(fs, db): |
1022 | tmp_fp = fs.allocate_tmp() |
1023 | ch = write_random(tmp_fp) |
1024 | @@ -196,7 +276,28 @@ |
1025 | self.assertTrue( |
1026 | parent // 4 <= metastore.VERIFY_BY_VERIFIED <= parent // 2 |
1027 | ) |
1028 | - self.assertGreater(metastore.VERIFY_BY_VERIFIED, metastore.VERIFY_BY_MTIME) |
1029 | + self.assertGreater(metastore.VERIFY_BY_VERIFIED, metastore.VERIFY_BY_MTIME) |
1030 | + |
1031 | + def test_GB(self): |
1032 | + self.assertIsInstance(metastore.GB, int) |
1033 | + self.assertEqual(metastore.GB, 10 ** 9) |
1034 | + self.assertEqual(metastore.GB, 1000 ** 3) |
1035 | + self.assertEqual(bytes10(metastore.GB), '1 GB') |
1036 | + |
1037 | + def test_MIN_BYTES_FREE(self): |
1038 | + self.assertIsInstance(metastore.MIN_BYTES_FREE, int) |
1039 | + self.assertGreaterEqual(metastore.MIN_BYTES_FREE, metastore.GB) |
1040 | + self.assertEqual(metastore.MIN_BYTES_FREE % metastore.GB, 0) |
1041 | + self.assertEqual(bytes10(metastore.MIN_BYTES_FREE), '16 GB') |
1042 | + |
1043 | + def test_MAX_BYTES_FREE(self): |
1044 | + self.assertIsInstance(metastore.MAX_BYTES_FREE, int) |
1045 | + self.assertGreaterEqual(metastore.MAX_BYTES_FREE, metastore.GB) |
1046 | + self.assertEqual(metastore.MAX_BYTES_FREE % metastore.GB, 0) |
1047 | + self.assertGreaterEqual(metastore.MAX_BYTES_FREE, |
1048 | + 2 * metastore.MIN_BYTES_FREE |
1049 | + ) |
1050 | + self.assertEqual(bytes10(metastore.MAX_BYTES_FREE), '64 GB') |
1051 | |
1052 | |
1053 | class TestFunctions(TestCase): |
1054 | @@ -243,6 +344,123 @@ |
1055 | self.assertEqual(doc, {'foo': {'bar': 0, 'baz': 1}}) |
1056 | self.assertIs(doc['foo'], ret) |
1057 | |
1058 | + def test_get_int(self): |
1059 | + # Bad `d` type: |
1060 | + bad = [random_id(), random_id()] |
1061 | + with self.assertRaises(TypeError) as cm: |
1062 | + metastore.get_int(bad, random_id()) |
1063 | + self.assertEqual( |
1064 | + str(cm.exception), |
1065 | + TYPE_ERROR.format('d', dict, list, bad) |
1066 | + ) |
1067 | + |
1068 | + # Bad `key` type: |
1069 | + bad = random.randint(0, 1000) |
1070 | + with self.assertRaises(TypeError) as cm: |
1071 | + metastore.get_int({}, bad) |
1072 | + self.assertEqual( |
1073 | + str(cm.exception), |
1074 | + TYPE_ERROR.format('key', str, int, bad) |
1075 | + ) |
1076 | + |
1077 | + # Empty: |
1078 | + doc = {} |
1079 | + ret = metastore.get_int(doc, 'foo') |
1080 | + self.assertIsInstance(ret, int) |
1081 | + self.assertEqual(ret, 0) |
1082 | + self.assertEqual(doc, {'foo': 0}) |
1083 | + self.assertIs(doc['foo'], ret) |
1084 | + |
1085 | + # Wrong type: |
1086 | + doc = {'foo': '17'} |
1087 | + ret = metastore.get_int(doc, 'foo') |
1088 | + self.assertIsInstance(ret, int) |
1089 | + self.assertEqual(ret, 0) |
1090 | + self.assertEqual(doc, {'foo': 0}) |
1091 | + self.assertIs(doc['foo'], ret) |
1092 | + |
1093 | + # Trickier wrong type: |
1094 | + doc = {'foo': 17.0} |
1095 | + ret = metastore.get_int(doc, 'foo') |
1096 | + self.assertIsInstance(ret, int) |
1097 | + self.assertEqual(ret, 0) |
1098 | + self.assertEqual(doc, {'foo': 0}) |
1099 | + self.assertIs(doc['foo'], ret) |
1100 | + |
1101 | + # Bad Value: |
1102 | + doc = {'foo': -17} |
1103 | + ret = metastore.get_int(doc, 'foo') |
1104 | + self.assertIsInstance(ret, int) |
1105 | + self.assertEqual(ret, 0) |
1106 | + self.assertEqual(doc, {'foo': 0}) |
1107 | + self.assertIs(doc['foo'], ret) |
1108 | + |
1109 | + # Another bad Value: |
1110 | + doc = {'foo': -1} |
1111 | + ret = metastore.get_int(doc, 'foo') |
1112 | + self.assertIsInstance(ret, int) |
1113 | + self.assertEqual(ret, 0) |
1114 | + self.assertEqual(doc, {'foo': 0}) |
1115 | + self.assertIs(doc['foo'], ret) |
1116 | + |
1117 | + # All good: |
1118 | + value = 17 |
1119 | + doc = {'foo': value} |
1120 | + self.assertIs(metastore.get_int(doc, 'foo'), value) |
1121 | + self.assertEqual(doc, {'foo': 17}) |
1122 | + self.assertIs(doc['foo'], value) |
1123 | + |
1124 | + # Also all good: |
1125 | + value = 0 |
1126 | + doc = {'foo': value} |
1127 | + self.assertIs(metastore.get_int(doc, 'foo'), value) |
1128 | + self.assertEqual(doc, {'foo': 0}) |
1129 | + self.assertIs(doc['foo'], value) |
1130 | + |
1131 | + def test_get_rank(self): |
1132 | + file_id = random_file_id() |
1133 | + store_ids = tuple(random_id() for i in range(3)) |
1134 | + for rank in range(7): |
1135 | + doc = build_file_at_rank(file_id, rank, store_ids) |
1136 | + self.assertEqual(metastore.get_rank(doc), rank) |
1137 | + |
1138 | + # Empty doc |
1139 | + doc = {} |
1140 | + self.assertEqual(metastore.get_rank(doc), 0) |
1141 | + self.assertEqual(doc, {'stored': {}}) |
1142 | + |
1143 | + # Empty doc['stored'] |
1144 | + doc = {'stored': {}} |
1145 | + self.assertEqual(metastore.get_rank(doc), 0) |
1146 | + self.assertEqual(doc, {'stored': {}}) |
1147 | + |
1148 | + # All kinds of broken: |
1149 | + store_ids = tuple(random_id() for i in range(6)) |
1150 | + doc = { |
1151 | + 'stored': { |
1152 | + store_ids[0]: {'copies': 1}, |
1153 | + store_ids[1]: {'copies': '17'}, |
1154 | + store_ids[2]: {'copies': -18}, |
1155 | + store_ids[3]: {}, |
1156 | + store_ids[4]: 'hello', |
1157 | + store_ids[5]: 3, |
1158 | + random_id(10): {'copies': 1}, |
1159 | + ('a' * 24): {'copies': 1}, |
1160 | + 42: {'copies': 1}, |
1161 | + }, |
1162 | + } |
1163 | + self.assertEqual(metastore.get_rank(doc), 4) |
1164 | + self.assertEqual(doc, { |
1165 | + 'stored': { |
1166 | + store_ids[0]: {'copies': 1}, |
1167 | + store_ids[1]: {'copies': 0}, |
1168 | + store_ids[2]: {'copies': 0}, |
1169 | + store_ids[3]: {'copies': 0}, |
1170 | + store_ids[4]: {'copies': 0}, |
1171 | + store_ids[5]: {'copies': 0}, |
1172 | + }, |
1173 | + }) |
1174 | + |
1175 | def test_get_mtime(self): |
1176 | fs = TempFileStore() |
1177 | _id = random_file_id() |
1178 | @@ -2791,6 +3009,7 @@ |
1179 | |
1180 | doc = db.get(fs.id) |
1181 | self.assertTrue(doc['_rev'].startswith('2-')) |
1182 | + self.assertIn('bytes_avail', doc) |
1183 | atime = doc.get('atime') |
1184 | self.assertIsInstance(atime, int) |
1185 | self.assertLessEqual(atime, int(time.time())) |
1186 | @@ -3643,59 +3862,93 @@ |
1187 | }, |
1188 | }) |
1189 | |
1190 | - def test_iter_fragile(self): |
1191 | + def test_iter_files_at_rank(self): |
1192 | db = util.get_db(self.env, True) |
1193 | ms = metastore.MetaStore(db) |
1194 | |
1195 | + # Bad rank type: |
1196 | + with self.assertRaises(TypeError) as cm: |
1197 | + list(ms.iter_files_at_rank(1.0)) |
1198 | + self.assertEqual(str(cm.exception), |
1199 | + TYPE_ERROR.format('rank', int, float, 1.0) |
1200 | + ) |
1201 | + |
1202 | + # Bad rank value: |
1203 | + with self.assertRaises(ValueError) as cm: |
1204 | + list(ms.iter_files_at_rank(-1)) |
1205 | + self.assertEqual(str(cm.exception), 'Need 0 <= rank <= 5; got -1') |
1206 | + with self.assertRaises(ValueError) as cm: |
1207 | + list(ms.iter_files_at_rank(6)) |
1208 | + self.assertEqual(str(cm.exception), 'Need 0 <= rank <= 5; got 6') |
1209 | + |
1210 | # Test when no files are in the library: |
1211 | - self.assertEqual(list(ms.iter_fragile()), []) |
1212 | + self.assertEqual(list(ms.iter_files_at_rank(0)), []) |
1213 | + self.assertEqual(list(ms.iter_files_at_rank(1)), []) |
1214 | + self.assertEqual(list(ms.iter_files_at_rank(2)), []) |
1215 | + self.assertEqual(list(ms.iter_files_at_rank(3)), []) |
1216 | + self.assertEqual(list(ms.iter_files_at_rank(4)), []) |
1217 | + self.assertEqual(list(ms.iter_files_at_rank(5)), []) |
1218 | |
1219 | - # Create rank=(0 through 6) test data: |
1220 | - ids = tuple(random_file_id() for i in range(7)) |
1221 | + # Create rank=(0 through 5) test data: |
1222 | stores = tuple(random_id() for i in range(3)) |
1223 | - docs = [ |
1224 | + docs_0 = [ |
1225 | { |
1226 | - '_id': ids[0], |
1227 | + '_id': random_file_id(), |
1228 | 'type': 'dmedia/file', |
1229 | 'origin': 'user', |
1230 | 'stored': {}, |
1231 | - }, |
1232 | + } |
1233 | + for i in range(100) |
1234 | + ] |
1235 | + docs_1 = [ |
1236 | { |
1237 | - '_id': ids[1], |
1238 | + '_id': random_file_id(), |
1239 | 'type': 'dmedia/file', |
1240 | 'origin': 'user', |
1241 | 'stored': { |
1242 | stores[0]: {'copies': 0}, |
1243 | }, |
1244 | - }, |
1245 | + } |
1246 | + for i in range(101) |
1247 | + ] |
1248 | + docs_2 = [ |
1249 | { |
1250 | - '_id': ids[2], |
1251 | + '_id': random_file_id(), |
1252 | 'type': 'dmedia/file', |
1253 | 'origin': 'user', |
1254 | 'stored': { |
1255 | stores[0]: {'copies': 1}, |
1256 | }, |
1257 | - }, |
1258 | + } |
1259 | + for i in range(102) |
1260 | + ] |
1261 | + docs_3 = [ |
1262 | { |
1263 | - '_id': ids[3], |
1264 | + '_id': random_file_id(), |
1265 | 'type': 'dmedia/file', |
1266 | 'origin': 'user', |
1267 | 'stored': { |
1268 | stores[0]: {'copies': 1}, |
1269 | stores[1]: {'copies': 0}, |
1270 | }, |
1271 | - }, |
1272 | + } |
1273 | + for i in range(103) |
1274 | + ] |
1275 | + docs_4 = [ |
1276 | { |
1277 | - '_id': ids[4], |
1278 | + '_id': random_file_id(), |
1279 | 'type': 'dmedia/file', |
1280 | 'origin': 'user', |
1281 | 'stored': { |
1282 | stores[0]: {'copies': 1}, |
1283 | stores[1]: {'copies': 1}, |
1284 | }, |
1285 | - }, |
1286 | + } |
1287 | + for i in range(104) |
1288 | + ] |
1289 | + docs_5 = [ |
1290 | { |
1291 | - '_id': ids[5], |
1292 | + '_id': random_file_id(), |
1293 | 'type': 'dmedia/file', |
1294 | 'origin': 'user', |
1295 | 'stored': { |
1296 | @@ -3703,9 +3956,181 @@ |
1297 | stores[1]: {'copies': 1}, |
1298 | stores[2]: {'copies': 0}, |
1299 | }, |
1300 | - }, |
1301 | + } |
1302 | + for i in range(105) |
1303 | + ] |
1304 | + docs = [] |
1305 | + doc_groups = (docs_0, docs_1, docs_2, docs_3, docs_4, docs_5) |
1306 | + for docs_n in doc_groups: |
1307 | + docs.extend(docs_n) |
1308 | + docs_n.sort(key=doc_id) |
1309 | + self.assertEqual(len(docs), 615) |
1310 | + db.save_many(docs) |
1311 | + |
1312 | + # Test that for each rank, we get the expected docs and no duplicates: |
1313 | + for (n, docs_n) in enumerate(doc_groups): |
1314 | + result = list(ms.iter_files_at_rank(n)) |
1315 | + self.assertEqual(len(result), 100 + n) |
1316 | + self.assertNotEqual(result, docs_n) # Due to random.shuffle() |
1317 | + self.assertEqual(sorted(result, key=doc_id), docs_n) |
1318 | + |
1319 | + # Similar to above, except this time we're modifying the docs as they're |
1320 | + # yielded so they're bumped up to rank=6 in the file/rank view: |
1321 | + self.assertEqual(len(doc_groups), 6) |
1322 | + self.assertEqual(db.view('file', 'rank', key=6)['rows'], []) |
1323 | + for (n, docs_n) in enumerate(doc_groups): |
1324 | + result = [] |
1325 | + for doc in ms.iter_files_at_rank(n): |
1326 | + result.append(doc) |
1327 | + new = deepcopy(doc) |
1328 | + new['stored'] = { |
1329 | + stores[0]: {'copies': 1}, |
1330 | + stores[1]: {'copies': 1}, |
1331 | + stores[2]: {'copies': 1}, |
1332 | + } |
1333 | + db.save(new) |
1334 | + self.assertEqual(len(result), 100 + n) |
1335 | + self.assertNotEqual(result, docs_n) # Due to random.shuffle() |
1336 | + self.assertEqual(sorted(result, key=doc_id), docs_n) |
1337 | + self.assertEqual(list(ms.iter_files_at_rank(n)), []) |
1338 | + |
1339 | + # Double check that rank 0 through 5 are still returning no docs: |
1340 | + self.assertEqual(list(ms.iter_files_at_rank(0)), []) |
1341 | + self.assertEqual(list(ms.iter_files_at_rank(1)), []) |
1342 | + self.assertEqual(list(ms.iter_files_at_rank(2)), []) |
1343 | + self.assertEqual(list(ms.iter_files_at_rank(3)), []) |
1344 | + self.assertEqual(list(ms.iter_files_at_rank(4)), []) |
1345 | + self.assertEqual(list(ms.iter_files_at_rank(5)), []) |
1346 | + |
1347 | + # And check that all the docs are still at rank=6 and _rev=2: |
1348 | + ids = sorted(d['_id'] for d in docs) |
1349 | + rows = db.view('file', 'rank', key=6)['rows'] |
1350 | + self.assertEqual(len(rows), 615) |
1351 | + self.assertEqual([r['id'] for r in rows], ids) |
1352 | + for doc in db.get_many(ids): |
1353 | + self.assertEqual(doc['_rev'][:2], '2-') |
1354 | + |
1355 | + def test_iter_files_at_rank_2(self): |
1356 | + """ |
1357 | + Ensure that get_rank() is used to filter out greater than current rank. |
1358 | + """ |
1359 | + db = util.get_db(self.env, True) |
1360 | + ms = metastore.MetaStore(db) |
1361 | + for rank in range(6): |
1362 | + ids = tuple(random_file_id() for i in range(50)) |
1363 | + store_ids = tuple(random_id() for i in range(3)) |
1364 | + docs = [build_file_at_rank(_id, rank, store_ids) for _id in ids] |
1365 | + db.save_many(docs) |
1366 | + dmap = dict((d['_id'], d) for d in docs) |
1367 | + docs.sort(key=doc_id) |
1368 | + |
1369 | + # Test with no modification: |
1370 | + result = list(ms.iter_files_at_rank(rank)) |
1371 | + self.assertEqual(len(result), 50) |
1372 | + self.assertNotEqual(result, docs) # Due to random.shuffle() |
1373 | + self.assertEqual(sorted(result, key=doc_id), docs) |
1374 | + |
1375 | + # Adjust 17 files to rank+1 after the first doc is yielded: |
1376 | + include = None |
1377 | + result = [] |
1378 | + for doc in ms.iter_files_at_rank(rank): |
1379 | + result.append(doc) |
1380 | + if include is None: |
1381 | + include = {doc['_id']} |
1382 | + remaining = set(ids) - include |
1383 | + remove = random.sample(remaining, 17) |
1384 | + include.update(remaining - set(remove)) |
1385 | + rdocs = [dmap[_id] for _id in remove] |
1386 | + for rdoc in rdocs: |
1387 | + rdoc['stored'] = build_stored_at_rank(rank + 1, store_ids) |
1388 | + self.assertEqual(metastore.get_rank(rdoc), rank + 1) |
1389 | + db.save_many(rdocs) |
1390 | + expected = [dmap[_id] for _id in include] |
1391 | + expected.sort(key=doc_id) |
1392 | + self.assertEqual(len(expected), 33) |
1393 | + self.assertEqual(len(result), 33) |
1394 | + self.assertNotEqual(result, expected) # Due to random.shuffle() |
1395 | + self.assertEqual(sorted(result, key=doc_id), expected) |
1396 | + |
1397 | + # Now check rank+1, unless we're at rank=5: |
1398 | + if rank < 5: |
1399 | + rdocs.sort(key=doc_id) |
1400 | + result = list(ms.iter_files_at_rank(rank + 1)) |
1401 | + self.assertEqual(len(result), 17) |
1402 | + self.assertNotEqual(result, rdocs) # Due to random.shuffle() |
1403 | + self.assertEqual(sorted(result, key=doc_id), rdocs) |
1404 | + # Clean up for rank+1: |
1405 | + for rdoc in rdocs: |
1406 | + rdoc['_deleted'] = True |
1407 | + db.save_many(rdocs) |
1408 | + self.assertEqual(list(ms.iter_files_at_rank(rank + 1)), []) |
1409 | + |
1410 | + def test_iter_fragile_files(self): |
1411 | + db = util.get_db(self.env, True) |
1412 | + |
1413 | + # Test with a mocked MetaStore.iter_files_at_rank(): |
1414 | + class Mocked(metastore.MetaStore): |
1415 | + def __init__(self, db, log_db=None): |
1416 | + super().__init__(db, log_db) |
1417 | + self._calls = [] |
1418 | + self._ranks = tuple( |
1419 | + tuple(random_id() for i in range(25)) |
1420 | + for rank in range(6) |
1421 | + ) |
1422 | + |
1423 | + def iter_files_at_rank(self, rank): |
1424 | + assert isinstance(rank, int) |
1425 | + assert 0 <= rank <= 5 |
1426 | + self._calls.append(rank) |
1427 | + for _id in self._ranks[rank]: |
1428 | + yield _id |
1429 | + |
1430 | + mocked = Mocked(db) |
1431 | + expected = [] |
1432 | + for ids in mocked._ranks: |
1433 | + expected.extend(ids) |
1434 | + self.assertEqual(list(mocked.iter_fragile_files()), expected) |
1435 | + self.assertEqual(mocked._calls, [0, 1, 2, 3, 4, 5]) |
1436 | + |
1437 | + # Now do a live test: |
1438 | + ms = metastore.MetaStore(db) |
1439 | + self.assertEqual(list(ms.iter_fragile_files()), []) |
1440 | + store_ids = tuple(random_id() for i in range(3)) |
1441 | + docs = [ |
1442 | + build_file_at_rank(random_file_id(), rank, store_ids) |
1443 | + for rank in range(7) |
1444 | + ] |
1445 | + db.save_many(docs) |
1446 | + self.assertEqual(list(ms.iter_fragile_files()), docs[:-1]) |
1447 | + for doc in docs: |
1448 | + doc['_deleted'] = True |
1449 | + db.save_many(docs) |
1450 | + self.assertEqual(list(ms.iter_fragile_files()), []) |
1451 | + |
1452 | + # Test pushing up through ranks: |
1453 | + docs = [ |
1454 | + build_file_at_rank(random_file_id(), 0, store_ids) |
1455 | + for i in range(100) |
1456 | + ] |
1457 | + db.save_many(docs) |
1458 | + docs.sort(key=doc_id) |
1459 | + for rank in range(6): |
1460 | + result = list(ms.iter_fragile_files()) |
1461 | + self.assertEqual(len(result), 100) |
1462 | + self.assertNotEqual(result, docs) # Due to random.shuffle() |
1463 | + self.assertEqual(sorted(result, key=doc_id), docs) |
1464 | + for doc in docs: |
1465 | + doc['stored'] = build_stored_at_rank(rank + 1, store_ids) |
1466 | + db.save_many(docs) |
1467 | + |
1468 | + def test_wait_for_fragile_files(self): |
1469 | + db = util.get_db(self.env, True) |
1470 | + ms = metastore.MetaStore(db) |
1471 | + |
1472 | + stores = tuple(random_id() for i in range(3)) |
1473 | + docs = [ |
1474 | { |
1475 | - '_id': ids[6], |
1476 | + '_id': random_file_id(), |
1477 | 'type': 'dmedia/file', |
1478 | 'origin': 'user', |
1479 | 'stored': { |
1480 | @@ -3713,104 +4138,57 @@ |
1481 | stores[1]: {'copies': 1}, |
1482 | stores[2]: {'copies': 1}, |
1483 | }, |
1484 | - }, |
1485 | + } |
1486 | + for i in range(4) |
1487 | ] |
1488 | db.save_many(docs) |
1489 | - |
1490 | - # We should get docs[0:6]: |
1491 | - self.assertEqual(list(ms.iter_fragile()), docs[0:-1]) |
1492 | - |
1493 | - # Docs should not be changed: |
1494 | - self.assertEqual(db.get_many(ids), docs) |
1495 | - |
1496 | - def test_iter_actionable_fragile(self): |
1497 | + last_seq = db.get()['update_seq'] |
1498 | + for doc in docs: |
1499 | + del doc['stored'][stores[0]] |
1500 | + db.save(doc) |
1501 | + result = ms.wait_for_fragile_files(last_seq) |
1502 | + self.assertEqual(result, { |
1503 | + 'last_seq': last_seq + 1, |
1504 | + 'results': [ |
1505 | + { |
1506 | + 'changes': [{'rev': doc['_rev']}], |
1507 | + 'doc': doc, |
1508 | + 'id': doc['_id'], |
1509 | + 'seq': last_seq + 1, |
1510 | + } |
1511 | + ], |
1512 | + }) |
1513 | + last_seq = result['last_seq'] |
1514 | + |
1515 | + def test_iter_preempt_files(self): |
1516 | db = util.get_db(self.env, True) |
1517 | ms = metastore.MetaStore(db) |
1518 | |
1519 | - store_id1 = random_id() |
1520 | - store_id2 = random_id() |
1521 | - store_id3 = random_id() |
1522 | - store_id4 = random_id() |
1523 | - empty = frozenset() |
1524 | - one = frozenset([store_id1]) |
1525 | - two = frozenset([store_id1, store_id2]) |
1526 | - three = frozenset([store_id1, store_id2, store_id3]) |
1527 | - |
1528 | - id1 = random_file_id() |
1529 | - id2 = random_file_id() |
1530 | - id3 = random_file_id() |
1531 | - doc1 = { |
1532 | - '_id': id1, |
1533 | - 'type': 'dmedia/file', |
1534 | - 'origin': 'user', |
1535 | - 'stored': { |
1536 | - store_id1: {'copies': 0}, |
1537 | - }, |
1538 | - } |
1539 | - doc2 = { |
1540 | - '_id': id2, |
1541 | - 'type': 'dmedia/file', |
1542 | - 'origin': 'user', |
1543 | - 'stored': { |
1544 | - store_id1: {'copies': 0}, |
1545 | - store_id4: {'copies': 1}, |
1546 | - }, |
1547 | - } |
1548 | - doc3 = { |
1549 | - '_id': id3, |
1550 | - 'type': 'dmedia/file', |
1551 | - 'origin': 'user', |
1552 | - 'stored': { |
1553 | - store_id1: {'copies': 1}, |
1554 | - store_id2: {'copies': 1}, |
1555 | - }, |
1556 | - } |
1557 | - |
1558 | - # Test when no files are in the library: |
1559 | - self.assertEqual(list(ms.iter_actionable_fragile(empty)), []) |
1560 | - self.assertEqual(list(ms.iter_actionable_fragile(one)), []) |
1561 | - self.assertEqual(list(ms.iter_actionable_fragile(two)), []) |
1562 | - self.assertEqual(list(ms.iter_actionable_fragile(three)), []) |
1563 | - |
1564 | - # All 3 docs should be included: |
1565 | - db.save_many([doc1, doc2, doc3]) |
1566 | - self.assertEqual(list(ms.iter_actionable_fragile(three)), [ |
1567 | - (doc1, set([store_id1])), |
1568 | - (doc2, set([store_id1, store_id4])), |
1569 | - (doc3, set([store_id1, store_id2])), |
1570 | - ]) |
1571 | - |
1572 | - # If only store_id1, store_id2 are connected, doc3 shouldn't be |
1573 | - # actionable: |
1574 | - self.assertEqual(list(ms.iter_actionable_fragile(two)), [ |
1575 | - (doc1, set([store_id1])), |
1576 | - (doc2, set([store_id1, store_id4])), |
1577 | - ]) |
1578 | - |
1579 | - # All files have a copy in store_id1, so nothing should be returned: |
1580 | - self.assertEqual(list(ms.iter_actionable_fragile(one)), []) |
1581 | - |
1582 | - # If doc2 was only stored on a non-connected store: |
1583 | - doc1['stored'] = { |
1584 | - store_id4: {'copies': 1}, |
1585 | - } |
1586 | - db.save(doc1) |
1587 | - self.assertEqual(list(ms.iter_actionable_fragile(one)), [ |
1588 | - (doc1, set([store_id4])) |
1589 | - ]) |
1590 | - |
1591 | - # If doc2 has sufficent durablity, it shouldn't be included, even though |
1592 | - # there is a free drive where a copy could be created: |
1593 | - doc2['stored'] = { |
1594 | - store_id1: {'copies': 1}, |
1595 | - store_id2: {'copies': 1}, |
1596 | - store_id4: {'copies': 1}, |
1597 | - } |
1598 | - db.save(doc2) |
1599 | - self.assertEqual(list(ms.iter_actionable_fragile(three)), [ |
1600 | - (doc1, set([store_id4])), |
1601 | - (doc3, set([store_id1, store_id2])), |
1602 | - ]) |
1603 | + # When empty |
1604 | + self.assertEqual(list(ms.iter_preempt_files()), []) |
1605 | + |
1606 | + # With live data: |
1607 | + store_ids = tuple(random_id() for i in range(3)) |
1608 | + docs = [ |
1609 | + build_file_at_rank(random_file_id(), 6, store_ids) |
1610 | + for i in range(107) |
1611 | + ] |
1612 | + base = int(time.time()) |
1613 | + for (i, doc) in enumerate(docs): |
1614 | + doc['atime'] = base - i |
1615 | + db.save_many(docs) |
1616 | + expected = docs[0:100] |
1617 | + result = list(ms.iter_preempt_files()) |
1618 | + self.assertEqual(len(result), 100) |
1619 | + self.assertNotEqual(result, expected) # Due to random.shuffle() |
1620 | + result.sort(key=lambda d: d['atime'], reverse=True) |
1621 | + self.assertEqual(result, expected) |
1622 | + |
1623 | + # Make sure files are excluded when durability isn't 3: |
1624 | + for doc in docs: |
1625 | + doc['stored'] = build_stored_at_rank(5, store_ids) |
1626 | + db.save_many(docs) |
1627 | + self.assertEqual(list(ms.iter_preempt_files()), []) |
1628 | |
1629 | def test_reclaim(self): |
1630 | # FIXME: Till we have a nice way of mocking FileStore.statvfs(), this is |
1631 | |
1632 | === modified file 'dmedia/tests/test_views.py' |
1633 | --- dmedia/tests/test_views.py 2013-10-12 07:19:16 +0000 |
1634 | +++ dmedia/tests/test_views.py 2013-11-21 03:22:50 +0000 |
1635 | @@ -1253,6 +1253,155 @@ |
1636 | {'rows': [], 'offset': 0, 'total_rows': 0}, |
1637 | ) |
1638 | |
1639 | + ###################################################################### |
1640 | + # 3rd, test assumptions about how "startkey_docid" and "key" interact: |
1641 | + db = Database('baz', self.env) |
1642 | + db.put(None) |
1643 | + design = self.build_view('rank') |
1644 | + db.save(design) |
1645 | + |
1646 | + # Create rank=(0 through 6) test data: |
1647 | + ranks = tuple( |
1648 | + tuple(random_file_id() for j in range(17 + i)) |
1649 | + for i in range(7) |
1650 | + ) |
1651 | + sorted_ranks = tuple( |
1652 | + tuple(sorted(rank_n)) for rank_n in ranks |
1653 | + ) |
1654 | + flattened = [] |
1655 | + for (n, rank_n) in enumerate(sorted_ranks): |
1656 | + flattened.extend((n, _id) for _id in rank_n) |
1657 | + flattened = tuple(flattened) |
1658 | + self.assertEqual(len(flattened), 140) |
1659 | + |
1660 | + stores = tuple(random_id() for i in range(3)) |
1661 | + docs = [] |
1662 | + docs.extend( |
1663 | + { |
1664 | + '_id': _id, |
1665 | + 'type': 'dmedia/file', |
1666 | + 'origin': 'user', |
1667 | + 'stored': {}, |
1668 | + } |
1669 | + for _id in ranks[0] |
1670 | + ) |
1671 | + docs.extend( |
1672 | + { |
1673 | + '_id': _id, |
1674 | + 'type': 'dmedia/file', |
1675 | + 'origin': 'user', |
1676 | + 'stored': { |
1677 | + stores[0]: {'copies': 0}, |
1678 | + }, |
1679 | + } |
1680 | + for _id in ranks[1] |
1681 | + ) |
1682 | + docs.extend( |
1683 | + { |
1684 | + '_id': _id, |
1685 | + 'type': 'dmedia/file', |
1686 | + 'origin': 'user', |
1687 | + 'stored': { |
1688 | + stores[0]: {'copies': 1}, |
1689 | + }, |
1690 | + } |
1691 | + for _id in ranks[2] |
1692 | + ) |
1693 | + docs.extend( |
1694 | + { |
1695 | + '_id': _id, |
1696 | + 'type': 'dmedia/file', |
1697 | + 'origin': 'user', |
1698 | + 'stored': { |
1699 | + stores[0]: {'copies': 1}, |
1700 | + stores[1]: {'copies': 0}, |
1701 | + }, |
1702 | + } |
1703 | + for _id in ranks[3] |
1704 | + ) |
1705 | + docs.extend( |
1706 | + { |
1707 | + '_id': _id, |
1708 | + 'type': 'dmedia/file', |
1709 | + 'origin': 'user', |
1710 | + 'stored': { |
1711 | + stores[0]: {'copies': 1}, |
1712 | + stores[1]: {'copies': 1}, |
1713 | + }, |
1714 | + } |
1715 | + for _id in ranks[4] |
1716 | + ) |
1717 | + docs.extend( |
1718 | + { |
1719 | + '_id': _id, |
1720 | + 'type': 'dmedia/file', |
1721 | + 'origin': 'user', |
1722 | + 'stored': { |
1723 | + stores[0]: {'copies': 1}, |
1724 | + stores[1]: {'copies': 1}, |
1725 | + stores[2]: {'copies': 0}, |
1726 | + }, |
1727 | + } |
1728 | + for _id in ranks[5] |
1729 | + ) |
1730 | + docs.extend( |
1731 | + { |
1732 | + '_id': _id, |
1733 | + 'type': 'dmedia/file', |
1734 | + 'origin': 'user', |
1735 | + 'stored': { |
1736 | + stores[0]: {'copies': 1}, |
1737 | + stores[1]: {'copies': 1}, |
1738 | + stores[2]: {'copies': 1}, |
1739 | + }, |
1740 | + } |
1741 | + for _id in ranks[6] |
1742 | + ) |
1743 | + self.assertEqual(len(docs), 140) |
1744 | + db.save_many(docs) |
1745 | + |
1746 | + # Test that sorting is being done by (key, _id): |
1747 | + self.assertEqual( |
1748 | + db.view('file', 'rank'), |
1749 | + { |
1750 | + 'offset': 0, |
1751 | + 'total_rows': 140, |
1752 | + 'rows': [ |
1753 | + {'key': n, 'id': _id, 'value': None} |
1754 | + for (n, _id) in flattened |
1755 | + ], |
1756 | + }, |
1757 | + ) |
1758 | + |
1759 | + # Test that sorting is being done by _id within a single key, then test |
1760 | + # that we can use "startkey_docid" as expected: |
1761 | + offset = 0 |
1762 | + for (n, rank_n) in enumerate(sorted_ranks): |
1763 | + self.assertEqual( |
1764 | + db.view('file', 'rank', key=n), |
1765 | + { |
1766 | + 'offset': offset, |
1767 | + 'total_rows': 140, |
1768 | + 'rows': [ |
1769 | + {'key': n, 'id': _id, 'value': None} |
1770 | + for _id in rank_n |
1771 | + ], |
1772 | + }, |
1773 | + ) |
1774 | + for i in range(len(rank_n)): |
1775 | + self.assertEqual( |
1776 | + db.view('file', 'rank', key=n, startkey_docid=rank_n[i]), |
1777 | + { |
1778 | + 'offset': offset + i, |
1779 | + 'total_rows': 140, |
1780 | + 'rows': [ |
1781 | + {'key': n, 'id': _id, 'value': None} |
1782 | + for _id in rank_n[i:] |
1783 | + ], |
1784 | + }, |
1785 | + ) |
1786 | + offset += len(rank_n) |
1787 | + |
1788 | def test_fragile(self): |
1789 | db = Database('foo', self.env) |
1790 | db.put(None) |
1791 | |
1792 | === modified file 'dmedia/views.py' |
1793 | --- dmedia/views.py 2013-10-12 07:19:16 +0000 |
1794 | +++ dmedia/views.py 2013-11-21 03:22:50 +0000 |
1795 | @@ -110,6 +110,10 @@ |
1796 | file_copies = """ |
1797 | function(doc) { |
1798 | if (doc.type == 'dmedia/file' && doc.origin == 'user') { |
1799 | + if (typeof doc.stored != 'object' || isArray(doc.stored)) { |
1800 | + emit(0, null); |
1801 | + return; |
1802 | + } |
1803 | var total = 0; |
1804 | var key, copies; |
1805 | for (key in doc.stored) { |
1806 | @@ -146,6 +150,27 @@ |
1807 | } |
1808 | """ |
1809 | |
1810 | +file_preempt = """ |
1811 | +function(doc) { |
1812 | + if (doc.type == 'dmedia/file' && doc.origin == 'user') { |
1813 | + if (typeof doc.stored != 'object' || isArray(doc.stored)) { |
1814 | + return; |
1815 | + } |
1816 | + var total = 0; |
1817 | + var key, copies; |
1818 | + for (key in doc.stored) { |
1819 | + copies = doc.stored[key].copies; |
1820 | + if (typeof copies == 'number' && copies > 0) { |
1821 | + total += copies; |
1822 | + } |
1823 | + } |
1824 | + if (total == 3) { |
1825 | + emit(doc.atime, null); |
1826 | + } |
1827 | + } |
1828 | +} |
1829 | +""" |
1830 | + |
1831 | file_fragile = """ |
1832 | function(doc) { |
1833 | if (doc.type == 'dmedia/file' && doc.origin == 'user') { |
1834 | @@ -302,7 +327,8 @@ |
1835 | 'stored': {'map': file_stored, 'reduce': _stats}, |
1836 | 'nonzero': {'map': file_nonzero}, |
1837 | 'copies': {'map': file_copies}, |
1838 | - 'rank': {'map': file_rank}, |
1839 | + 'rank': {'map': file_rank, 'reduce': _count}, |
1840 | + 'preempt': {'map': file_preempt}, |
1841 | 'fragile': {'map': file_fragile}, |
1842 | 'downgrade-by-mtime': {'map': file_downgrade_by_mtime}, |
1843 | 'downgrade-by-verified': {'map': file_downgrade_by_verified}, |
1844 | @@ -332,6 +358,16 @@ |
1845 | } |
1846 | """ |
1847 | |
1848 | +store_bytes_avail = """ |
1849 | +function(doc) { |
1850 | + if (doc.type == 'dmedia/store') { |
1851 | + if (typeof doc.bytes_avail == 'number') { |
1852 | + emit(doc.bytes_avail, null); |
1853 | + } |
1854 | + } |
1855 | +} |
1856 | +""" |
1857 | + |
1858 | store_drive_serial = """ |
1859 | function(doc) { |
1860 | if (doc.type == 'dmedia/store') { |
1861 | @@ -344,6 +380,7 @@ |
1862 | '_id': '_design/store', |
1863 | 'views': { |
1864 | 'atime': {'map': store_atime}, |
1865 | + 'bytes_avail': {'map': store_bytes_avail}, |
1866 | 'drive_serial': {'map': store_drive_serial}, |
1867 | }, |
1868 | } |
Looks good, let's work on completely resolving https:/ /bugs.launchpad .net/bugs/ 1247530 now!