Merge lp:~jderose/dmedia/fix-1247530 into lp:dmedia

Proposed by Jason Gerard DeRose
Status: Merged
Merged at revision: 760
Proposed branch: lp:~jderose/dmedia/fix-1247530
Merge into: lp:dmedia
Diff against target: 1868 lines (+1306/-230)
8 files modified
dmedia/core.py (+220/-55)
dmedia/local.py (+9/-9)
dmedia/metastore.py (+219/-53)
dmedia/tests/test_core.py (+144/-0)
dmedia/tests/test_local.py (+37/-0)
dmedia/tests/test_metastore.py (+490/-112)
dmedia/tests/test_views.py (+149/-0)
dmedia/views.py (+38/-1)
To merge this branch: bzr merge lp:~jderose/dmedia/fix-1247530
Reviewer Review Type Date Requested Status
David Jordan Approve
Review via email: mp+196045@code.launchpad.net

Description of the change

For background, please see this bug:
https://bugs.launchpad.net/dmedia/+bug/1253494

Changes include:

 * Replace core._vigilance_worker() function with new core.Vigilance class that breaks things down into simple methods that can be more easily unit tested; also adds a number of said unit tests, but we're still not at 100% coverage

 * The copy-increasing behavior (Vigilance) now uses an IO cost accounting model and will take the least expensive route (IO wise) in order to get all files at rank=1 up to rank=2, then all files at rank=2 up to rank=3, and so on; as a result, Dmedia now converges much more quickly; the biggest gain here is because now Vigilance will simply verify a downgraded file, whereas before it would create a new copy (verifying is the cheapest option in the IO cost accounting model)

 * When downloading from a local peer, Vigilance now uses the dmedia/machine docs to determine what FileStore are connected to each peer, and thereby which should have a particular file; this is much more efficient than the previous approach, which was to make a HEAD request to each peer till the file was found; this likewise means Dmedia now converges more quickly when you have multiple devices in your Dmedia library (especially in the case of 3 or more peers).

 * LocalStores.filter_by_avail(), LocalStores.find_dst_store() now take a *threshold* argument for the available bytes to maintain, rather than relying on the hard-coded local.MIN_FREE_SPACE constant; this was needed because the new preemptive copy-increasing uses a different threshold than the normal copying increasing

 * In the metastore module, add MIN_BYTES_FREE constant (which replaces MIN_FREE_SPACE), and rename RECLAIM_BYTES to clearer and more consistent MAX_BYTES_FREE; add unit tests for these constants, in particular to enforce that (MAX_BYTES_FREE >= 2 * MIN_BYTES_FREE)

 * Vigilance now does preemptive copy-increasing up to MAX_BYTES_FREE: in other words, a 4th copy will be speculatively created, but only up to a larger min available space threshold; copies are created in descending order of atime; in tandem with MetaStore.reclaim_all(), this change will *usually* create enough shuffling "flow" to prevent the non-convergent scenario discussed in lp:1247530; however, this preemptive copy-increasing does *not* provide a strong, bounded guarantee that lp:1247530 wont happen, so we don't yet consider that bug fixed!

 * Replace MetaStore.iter_fragile() with simpler, more focused MetaStore.iter_files_at_rank(); this new method steps through the files at each rank 50 docs at a time (whereas before it would retrieve the IDs for every file a the rank in question, even if that meant retrieving, say, a million rows); however, similar order randomization is still done so that different peers don't step on each others toes constantly

 * Add get_rank() function, which MetaStore.iter_fragile() uses to do present-moment filtering as it retrieves each doc; previously this filtering was done by the _vigilance_worker()

 * Add MetaStore.wait_for_fragile_files() method, which implements a single step in the CouchDB _changes event-loop that used to be in MetaStore.iter_fragile(); importantly, this makes this code easy to unit test now; and said unit test was added ;)

 * Replace MetaStore.iter_actionable_fragile() with MetaStore.iter_fragile_files(), which now just processes rank=0 through rank=5, and does no filtering; this change was needed because now Vigilance will verify downgraded files, even if there isn't a free FileStore where a new copy could be created

 * Add MetaStore.iter_preempt_files() to drive the preemptive copy-increasing; this does order randomization and present-moment filtering similar to MetaStore.iter_files_at_rank()

 * Add get_copies() function, which MetaStore.iter_preempt_files() uses to do present-moment filtering as it retrieves each doc

 * Add new file/preempt view to drive the preemptive copy-increasing

To post a comment you must log in.
Revision history for this message
David Jordan (dmj726) wrote :

Looks good, let's work on completely resolving https://bugs.launchpad.net/bugs/1247530 now!

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'dmedia/core.py'
2--- dmedia/core.py 2013-10-24 02:59:18 +0000
3+++ dmedia/core.py 2013-11-21 03:22:50 +0000
4@@ -53,7 +53,8 @@
5 from dmedia import util, schema, views
6 from dmedia.client import Downloader, get_client, build_ssl_context
7 from dmedia.metastore import MetaStore, create_stored, get_dict
8-from dmedia.local import LocalStores, FileNotLocal, MIN_FREE_SPACE
9+from dmedia.metastore import MIN_BYTES_FREE, MAX_BYTES_FREE
10+from dmedia.local import LocalStores, FileNotLocal
11
12
13 log = logging.getLogger()
14@@ -230,65 +231,221 @@
15 log.exception('Error updating project stats for %r', project_id)
16
17
18-def _vigilance_worker(env, ssl_config):
19- """
20- Run the event-based copy-increasing loop to maintain file durability.
21- """
22- db = util.get_db(env)
23- ms = MetaStore(db)
24-
25- local_stores = ms.get_local_stores()
26- if len(local_stores) == 0:
27- log.warning('No connected local stores, cannot increase copies')
28- return
29- connected = frozenset(local_stores.ids)
30- log.info('Connected %r', connected)
31-
32- clients = []
33- peers = ms.get_local_peers()
34- if peers:
35- ssl_context = build_ssl_context(ssl_config)
36- for (peer_id, info) in peers.items():
37- url = info['url']
38- log.info('Peer %s at %s', peer_id, url)
39- clients.append(get_client(url, ssl_context))
40- else:
41- log.info('No known peers on local network')
42-
43- for (doc, stored) in ms.iter_actionable_fragile(connected, True):
44+def get_downgraded(doc):
45+ downgraded = []
46+ for (key, value) in doc['stored'].items():
47+ copies = value['copies']
48+ verified = value.get('verified')
49+ if copies == 0 and not isinstance(verified, int):
50+ downgraded.append(key)
51+ return downgraded
52+
53+
54+class Vigilance:
55+ def __init__(self, ms, ssl_config):
56+ self.ms = ms
57+ self.stores = ms.get_local_stores()
58+ for fs in self.stores:
59+ log.info('Vigilance: local store: %r', fs)
60+ self.local = frozenset(self.stores.ids)
61+ self.clients = {}
62+ self.store_to_peer = {}
63+ remote = []
64+ peers = ms.get_local_peers()
65+ if peers:
66+ ssl_context = build_ssl_context(ssl_config)
67+ for (peer_id, info) in peers.items():
68+ url = info['url']
69+ log.info('Vigilance: peer %s at %s', peer_id, url)
70+ self.clients[peer_id] = get_client(url, ssl_context)
71+ for doc in ms.db.get_many(list(peers)):
72+ if doc is not None:
73+ for store_id in get_dict(doc, 'stores'):
74+ if is_store_id(store_id):
75+ remote.append(store_id)
76+ self.store_to_peer[store_id] = doc['_id']
77+ assert doc['_id'] in peers
78+ self.remote = frozenset(remote)
79+
80+ def run(self):
81+ last_seq = self.process_backlog()
82+ log.info('Vigilance: processed backlog as of %r', last_seq)
83+ self.process_preempt()
84+ self.run_event_loop(last_seq)
85+
86+ def process_backlog(self):
87+ for doc in self.ms.iter_fragile_files():
88+ self.wrap_up_rank(doc)
89+ return self.ms.db.get()['update_seq']
90+
91+ def process_preempt(self):
92+ for doc in self.ms.iter_preempt_files():
93+ self.wrap_up_rank(doc, threshold=MAX_BYTES_FREE)
94+
95+ def run_event_loop(self, last_seq):
96+ log.info('Vigilance: starting event loop at %d', last_seq)
97+ while True:
98+ result = self.ms.wait_for_fragile_files(last_seq)
99+ last_seq = result['last_seq']
100+ for row in result['results']:
101+ self.wrap_up_rank(row['doc'])
102+
103+ def wrap_up_rank(self, doc, threshold=MIN_BYTES_FREE):
104+ try:
105+ return self.up_rank(doc, threshold)
106+ except Exception:
107+ log.exception('Error calling Vigilance.up_rank() for %r', doc)
108+
109+ def up_rank(self, doc, threshold):
110+ """
111+ Implements the rank-increasing decision tree.
112+
113+ There are 4 possible actions:
114+
115+ 1) Verify a local copy currently in a downgraded state
116+
117+ 2) Copy from a local FileStore to another local FileStore
118+
119+ 3) Download from a remote peer to a local FileStore
120+
121+ 4) Do nothing as no rank-increasing action is possible
122+
123+ This is a high-level tree based on set operations. The action taken
124+ here may not actually be possible because this method doesn't consider
125+ whether there is a local FileStore with enough available space, which
126+ when there isn't, actions (2) and (3) wont be possible.
127+
128+ We use a simple IO cost accounting model: reading a copy costs one unit,
129+ and writing copy likewise costs one unit. For example, consider these
130+ three operations:
131+
132+ ==== ==============================================
133+ Cost Action
134+ ==== ==============================================
135+ 1 Verify a copy (1 read unit)
136+ 2 Create a copy (1 read unit, 1 write unit)
137+ 3 Create two copies (1 read unit, 2 write units)
138+ ==== ==============================================
139+
140+ This method will take the least expensive route (in IO cost units) that
141+ will increase the file rank by at least 1.
142+
143+ It's tempting to look for actions with a lower cost to benefit ratio,
144+ even when the cost is higher. For example, consider these actions:
145+
146+ ==== ===== ===== ========================
147+ Cost +Rank Ratio Action
148+ ==== ===== ===== ========================
149+ 1 1 1.00 Verify a downgraded copy
150+ 2 2 1.00 Create a copy
151+ 3 4 0.75 Create two copies
152+ ==== ===== ===== ========================
153+
154+ In this sense, it's a better deal to create two new copies (which is the
155+ action Dmedia formerly would take). However, because greater IO
156+ resources are consumed, this means it will necessarily delay acting on
157+ other equally fragile files (other files at the current rank being
158+ processed).
159+
160+ Dmedia will now take the cheapest route to getting all files at rank=1
161+ up to at least rank=2, then getting all files at rank=2 up to at least
162+ rank=3, and so on.
163+
164+ Another interesting "good deal" is creating new copies by reading from
165+ a local downgraded copy (because the source file is always verified as
166+ its read):
167+
168+ ==== ===== ===== ========================================
169+ Cost +Rank Ratio Action
170+ ==== ===== ===== ========================================
171+ 1 1 1.00 Verify a downgraded copy
172+ 2 2 1.00 Create a copy
173+ 2 3 0.66 Create a copy from a downgraded copy
174+ 3 4 0.75 Create two copies
175+ 3 5 0.60 Create two copies from a downgraded copy
176+ ==== ===== ===== ========================================
177+
178+ One place where this does make sense is when there is a locally
179+ available file at rank=1 (a single physical copy in a downgraded state),
180+ and a locally connected FileStore with enough free space to create a new
181+ copy. As a state of having only a single physical copy is so dangerous,
182+ it makes sense to bend the rules here.
183+
184+ FIXME: Dmedia doesn't yet do this! Probably the best way to implement
185+ this is for the decision tree here to work as it does, but to add some
186+ special case handling in Vigilance.up_rank_by_verifying(). Assuming the
187+ needed space isn't available on another FileStore, we should still at
188+ least verify the copy.
189+
190+ However, the same will not be done for a file at rank=3 (two physical
191+ copies, one in a downgraded state). In this case the downgraded copy
192+ will simply be verified, using 1 IO unit and increasing the rank to 4.
193+
194+ Note that we use the same cost for a read whether reading from a local
195+ drive or downloading from a peer. Although downloading is cheaper when
196+ looked at only from the perspective of the local node, it has the same
197+ cost when considering the total Dmedia library.
198+
199+ The other peers will likewise be doing their best to address any fragile
200+ files. And furthermore, local network IO is generally a more scarce
201+ resource (especially over WiFi), so we should only download when its
202+ absolutely needed (ie, when no local copy is available).
203+ """
204+ assert isinstance(threshold, int) and threshold > 0
205+ stored = set(doc['stored'])
206+ local = stored.intersection(self.local)
207+ downgraded = local.intersection(get_downgraded(doc))
208+ free = self.local - stored
209+ remote = stored.intersection(self.remote)
210+ if local:
211+ if downgraded:
212+ return self.up_rank_by_verifying(doc, downgraded)
213+ elif free:
214+ return self.up_rank_by_copying(doc, free, threshold)
215+ elif remote:
216+ return self.up_rank_by_downloading(doc, remote, threshold)
217+
218+ def up_rank_by_verifying(self, doc, downgraded):
219+ assert isinstance(downgraded, set)
220+ store_id = downgraded.pop()
221+ fs = self.stores.by_id(store_id)
222+ return self.ms.verify(fs, doc)
223+
224+ def up_rank_by_copying(self, doc, free, threshold):
225+ dst = self.stores.filter_by_avail(free, doc['bytes'], 1, threshold)
226+ if dst:
227+ src = self.stores.choose_local_store(doc)
228+ return self.ms.copy(src, doc, *dst)
229+
230+ def up_rank_by_downloading(self, doc, remote, threshold):
231+ fs = self.stores.find_dst_store(doc['bytes'], threshold)
232+ if fs is None:
233+ return
234+ peer_ids = frozenset(
235+ self.store_to_peer[store_id] for store_id in remote
236+ )
237+ downloader = None
238 _id = doc['_id']
239- copies = sum(v['copies'] for v in doc['stored'].values())
240- if copies >= 3:
241- log.warning('%s already has copies >= 3, skipping', _id)
242- continue
243- size = doc['bytes']
244- local = connected.intersection(stored) # Any local copies?
245- if local:
246- free = connected - stored
247- src = local_stores.choose_local_store(doc)
248- dst = local_stores.filter_by_avail(free, size, 3 - copies)
249- if dst:
250- ms.copy(src, doc, *dst)
251- elif clients:
252- fs = local_stores.find_dst_store(size)
253- if fs is None:
254- log.warning(
255- 'No FileStore with avail space to download %s', _id
256- )
257+ for peer_id in peer_ids:
258+ client = self.clients[peer_id]
259+ if not client.has_file(_id):
260 continue
261- for client in clients:
262- if not client.has_file(_id):
263- continue
264- downloader = Downloader(doc, ms, fs)
265- try:
266- downloader.download_from(client)
267- except Exception:
268- log.exception('Error downloading %s from %s', _id, client)
269+ if downloader is None:
270+ downloader = Downloader(doc, self.ms, fs)
271+ try:
272+ downloader.download_from(client)
273+ except Exception:
274+ log.exception('Error downloading %s from %s', _id, client)
275+ if downloader.download_is_complete():
276+ return downloader.doc
277
278
279 def vigilance_worker(env, ssl_config):
280 try:
281- _vigilance_worker(env, ssl_config)
282+ db = util.get_db(env)
283+ ms = MetaStore(db)
284+ vigilance = Vigilance(ms, ssl_config)
285+ vigilance.run()
286 except Exception:
287 log.exception('Error in vigilance_worker():')
288
289@@ -328,7 +485,11 @@
290
291
292 def is_file_id(_id):
293- return isdb32(_id) and len(_id) == 48
294+ return isinstance(_id, str) and len(_id) == 48 and isdb32(_id)
295+
296+
297+def is_store_id(_id):
298+ return isinstance(_id, str) and len(_id) == 24 and isdb32(_id)
299
300
301 def clean_file_id(_id):
302@@ -531,6 +692,10 @@
303 self.task_manager.requeue_filestore_tasks(tuple(self.stores))
304
305 def restart_vigilance(self):
306+ # FIXME: Core should also restart Vigilance whenever the FileStore
307+ # connected to a peer change. We should do this by monitoring the
308+ # _changes feed for changes to any of the machine docs corresponding to
309+ # the currently visible local peers.
310 self.task_manager.restart_vigilance()
311
312 def get_auto_format(self):
313
314=== modified file 'dmedia/local.py'
315--- dmedia/local.py 2013-10-24 02:21:03 +0000
316+++ dmedia/local.py 2013-11-21 03:22:50 +0000
317@@ -33,7 +33,6 @@
318
319
320 log = logging.getLogger()
321-MIN_FREE_SPACE = 16 * 1024**3 # 8 GiB min free space
322
323
324 class NoSuchFile(Exception):
325@@ -170,24 +169,25 @@
326 reverse=reverse,
327 )
328
329- def filter_by_avail(self, free_set, size, copies_needed):
330- assert isinstance(size, int) and size >= 1
331- assert isinstance(copies_needed, int) and 1 <= copies_needed <= 3
332+ def filter_by_avail(self, free, size, copies, threshold):
333+ assert isinstance(size, int) and size > 0
334+ assert isinstance(copies, int) and 1 <= copies <= 3
335+ assert isinstance(threshold, int) and threshold > 0
336 stores = []
337- required_avail = size + MIN_FREE_SPACE
338+ required_avail = size + threshold
339 for fs in self.sort_by_avail():
340- if fs.id in free_set and fs.statvfs().avail >= required_avail:
341+ if fs.id in free and fs.statvfs().avail >= required_avail:
342 stores.append(fs)
343- if len(stores) >= copies_needed:
344+ if len(stores) >= copies:
345 break
346 return stores
347
348- def find_dst_store(self, size):
349+ def find_dst_store(self, size, threshold):
350 stores = self.sort_by_avail()
351 if not stores:
352 return
353 fs = stores[0]
354- if fs.statvfs().avail >= size + MIN_FREE_SPACE:
355+ if fs.statvfs().avail >= size + threshold:
356 return fs
357
358 def local_stores(self):
359
360=== modified file 'dmedia/metastore.py'
361--- dmedia/metastore.py 2013-10-30 03:26:10 +0000
362+++ dmedia/metastore.py 2013-11-21 03:22:50 +0000
363@@ -67,7 +67,7 @@
364 from random import SystemRandom
365 from copy import deepcopy
366
367-from dbase32 import log_id
368+from dbase32 import log_id, isdb32
369 from filestore import FileStore, CorruptFile, FileNotFound, check_root_hash
370 from microfiber import NotFound, Conflict, BadRequest, BulkConflict
371 from microfiber import id_slice_iter, dumps
372@@ -90,9 +90,9 @@
373 VERIFY_BY_MTIME = DOWNGRADE_BY_MTIME // 8
374 VERIFY_BY_VERIFIED = DOWNGRADE_BY_VERIFIED // 2
375
376-GiB = 1024**3
377-RECLAIM_BYTES = 64 * GiB
378-
379+GB = 1000000000
380+MIN_BYTES_FREE = 16 * GB
381+MAX_BYTES_FREE = 64 * GB
382
383
384 class TimeDelta:
385@@ -140,6 +140,139 @@
386 return d[key]
387
388
389+def get_int(d, key):
390+ """
391+ Force value for *key* in *d* to be an ``int`` >= 0.
392+
393+ For example:
394+
395+ >>> doc = {'foo': 'BAR'}
396+ >>> get_int(doc, 'foo')
397+ 0
398+ >>> doc
399+ {'foo': 0}
400+
401+ """
402+ if not isinstance(d, dict):
403+ raise TypeError(TYPE_ERROR.format('d', dict, type(d), d))
404+ if not isinstance(key, str):
405+ raise TypeError(TYPE_ERROR.format('key', str, type(key), key))
406+ value = d.get(key)
407+ if isinstance(value, int) and value >= 0:
408+ return value
409+ d[key] = 0
410+ return d[key]
411+
412+
413+def get_rank(doc):
414+ """
415+ Calculate the rank of the file represented by *doc*.
416+
417+ The rank of a file is the number of physical drives its assumed to be stored
418+ upon plus the sum of the assumed durability of those copies, basically::
419+
420+ rank = len(doc['stored']) + sum(v['copies'] for v in doc['stored'].values())
421+
422+ However, this function can cope with an arbitrarily broken *doc*, as long as
423+ *doc* is at least a ``dict`` instance. For example:
424+
425+ >>> doc = {
426+ ... 'stored': {
427+ ... '333333333333333333333333': {'copies': 1},
428+ ... '999999999999999999999999': {'copies': -6},
429+ ... 'AAAAAAAAAAAAAAAAAAAAAAAA': 'junk',
430+ ... 'YYYYYYYYYYYYYYYY': 'store_id too short',
431+ ... 42: 'the ultimate key to the ultimate value',
432+ ... },
433+ ... }
434+ >>> get_rank(doc)
435+ 4
436+
437+ Any needed schema coercion is done in-place:
438+
439+ >>> doc == {
440+ ... 'stored': {
441+ ... '333333333333333333333333': {'copies': 1},
442+ ... '999999999999999999999999': {'copies': 0},
443+ ... 'AAAAAAAAAAAAAAAAAAAAAAAA': {'copies': 0},
444+ ... },
445+ ... }
446+ True
447+
448+ It even works with an empty doc:
449+
450+ >>> doc = {}
451+ >>> get_rank(doc)
452+ 0
453+ >>> doc
454+ {'stored': {}}
455+
456+ The rank of a file is used to order (prioritize) the copy increasing
457+ behavior, which is done from lowest rank to highest rank (from most fragile
458+ to least fragile).
459+
460+ Also see the "file/rank" CouchDB view function in `dmedia.views`.
461+ """
462+ stored = get_dict(doc, 'stored')
463+ copies = 0
464+ for key in tuple(stored):
465+ if isinstance(key, str) and len(key) == 24 and isdb32(key):
466+ value = get_dict(stored, key)
467+ copies += get_int(value, 'copies')
468+ else:
469+ del stored[key]
470+ return min(3, len(stored)) + min(3, copies)
471+
472+
473+def get_copies(doc):
474+ """
475+ Calculate the durability of the file represented by *doc*.
476+
477+ For example:
478+
479+ >>> doc = {
480+ ... 'stored': {
481+ ... '333333333333333333333333': {'copies': 1},
482+ ... '999999999999999999999999': {'copies': -6},
483+ ... 'AAAAAAAAAAAAAAAAAAAAAAAA': 'junk',
484+ ... 'YYYYYYYYYYYYYYYY': 'store_id too short',
485+ ... 42: 'the ultimate key to the ultimate value',
486+ ... },
487+ ... }
488+ >>> get_copies(doc)
489+ 1
490+
491+ Any needed schema coercion is done in-place:
492+
493+ >>> doc == {
494+ ... 'stored': {
495+ ... '333333333333333333333333': {'copies': 1},
496+ ... '999999999999999999999999': {'copies': 0},
497+ ... 'AAAAAAAAAAAAAAAAAAAAAAAA': {'copies': 0},
498+ ... },
499+ ... }
500+ True
501+
502+ It even works with an empty doc:
503+
504+ >>> doc = {}
505+ >>> get_copies(doc)
506+ 0
507+ >>> doc
508+ {'stored': {}}
509+
510+ """
511+ stored = get_dict(doc, 'stored')
512+ copies = 0
513+ for key in tuple(stored):
514+ if isinstance(key, str) and len(key) == 24 and isdb32(key):
515+ value = get_dict(stored, key)
516+ copies += get_int(value, 'copies')
517+ else:
518+ del stored[key]
519+ return copies
520+
521+
522 def get_mtime(fs, _id):
523 return int(fs.stat(_id).mtime)
524
525@@ -564,7 +697,7 @@
526 count += len(docs)
527 try:
528 self.db.save_many(docs)
529- except BulkConflict:
530+ except BulkConflict as e:
531 log.exception('Conflict purging %s', store_id)
532 count -= len(e.conflicts)
533 try:
534@@ -598,21 +731,21 @@
535
536 A fundamental design tenet of Dmedia is that it doesn't particularly
537 trust its metadata, and instead does frequent reality checks. This
538- allows Dmedia to work even though removable storage is constantly
539- "offline". In other distributed file-systems, this is usually called
540- being in a "network-partitioned" state.
541+ allows Dmedia to work even though removable storage is often offline,
542+ meaning the overall Dmedia library is often in a network-partitioned
543+ state even when all the peers in the library might be online.
544
545 Dmedia deals with removable storage via a quickly decaying confidence
546 in its metadata. If a removable drive hasn't been connected longer
547 than some threshold, Dmedia will update all those copies to count for
548 zero durability.
549
550- And whenever a removable drive (on any drive for that matter) is
551- connected, Dmedia immediately checks to see what files are actually on
552- the drive, and whether they have good integrity.
553+ Whenever a removable drive (or any drive for that matter) is connected,
554+ Dmedia immediately checks to see what files are actually on the drive,
555+ and whether they have good integrity.
556
557 `MetaStore.scan()` is the most important reality check that Dmedia does
558- because it's fast and can therefor be done quite often. Thousands of
559+ because it's fast and can therefor be done frequently. Thousands of
560 files can be scanned in a few seconds.
561
562 The scan insures that for every file expected in this file-store, the
563@@ -625,12 +758,13 @@
564 the file-store. Then the doc is updated accordingly marking the file as
565 being corrupt in this file-store, and the doc is saved.
566
567- If the file doesn't have the expected mtime is this file-store, this
568+ If the file doesn't have the expected mtime in this file-store, this
569 copy gets downgraded to zero copies worth of durability, and the last
570 verification timestamp is deleted, if present. This will put the file
571 first in line for full content-hash verification. If the verification
572- passes, the durability is raised back to the appropriate number of
573- copies.
574+ passes, the durability will be raised back to the appropriate number of
575+ copies (although note this is done by `MetaStore.verify_by_downgraded()`,
576+ not by this method).
577
578 :param fs: a `FileStore` instance
579 """
580@@ -677,6 +811,7 @@
581 except NotFound:
582 doc = deepcopy(fs.doc)
583 doc['atime'] = int(time.time())
584+ doc['bytes_avail'] = fs.statvfs().avail
585 self.db.save(doc)
586 t.log('scan %r files in %r', count, fs)
587 return count
588@@ -856,37 +991,60 @@
589 new = create_stored(doc['_id'], fs)
590 return self.db.update(mark_added, doc, new)
591
592- def iter_fragile(self, monitor=False):
593- """
594- Yield doc for each fragile file.
595- """
596- for rank in range(6):
597- result = self.db.view('file', 'rank', key=rank, update_seq=True)
598- update_seq = result.get('update_seq')
599- ids = [row['id'] for row in result['rows']]
600- del result # result might be quite large, free some memory
601+ def iter_files_at_rank(self, rank):
602+ if not isinstance(rank, int):
603+ raise TypeError(TYPE_ERROR.format('rank', int, type(rank), rank))
604+ if not (0 <= rank <= 5):
605+ raise ValueError('Need 0 <= rank <= 5; got {}'.format(rank))
606+ LIMIT = 50
607+ kw = {
608+ 'limit': LIMIT,
609+ 'key': rank,
610+ }
611+ while True:
612+ rows = self.db.view('file', 'rank', **kw)['rows']
613+ if not rows:
614+ break
615+ ids = [r['id'] for r in rows]
616+ if ids[0] == kw.get('startkey_docid'):
617+ ids.pop(0)
618+ if not ids:
619+ break
620+ log.info('Considering %d files at rank=%d starting at %s',
621+ len(ids), rank, ids[0]
622+ )
623 random.shuffle(ids)
624- log.info('vigilance: %d files at rank=%d', len(ids), rank)
625 for _id in ids:
626- yield self.db.get(_id)
627- if not monitor:
628- return
629-
630- # Now we enter an event-based loop using the _changes feed:
631- if update_seq is None:
632- update_seq = self.db.get()['update_seq']
633+ try:
634+ doc = self.db.get(_id)
635+ doc_rank = get_rank(doc)
636+ if doc_rank <= rank:
637+ yield doc
638+ else:
639+ log.info('Now at rank %d > %d, skipping %s',
640+ doc_rank, rank, doc.get('_id')
641+ )
642+ except NotFound:
643+ log.warning('doc NotFound for %s at rank=%d', _id, rank)
644+ if len(rows) < LIMIT:
645+ break
646+ kw['startkey_docid'] = rows[-1]['id']
647+
648+ def iter_fragile_files(self):
649+ for rank in range(6):
650+ for doc in self.iter_files_at_rank(rank):
651+ yield doc
652+
653+ def wait_for_fragile_files(self, last_seq):
654 kw = {
655 'feed': 'longpoll',
656 'include_docs': True,
657 'filter': 'file/fragile',
658- 'since': update_seq,
659+ 'since': last_seq,
660 }
661 while True:
662 try:
663- result = self.db.get('_changes', **kw)
664- for row in result['results']:
665- yield row['doc']
666- kw['since'] = result['last_seq']
667+ return self.db.get('_changes', **kw)
668 # FIXME: Sometimes we get a 400 Bad Request from CouchDB, perhaps
669 # when `since` gets ahead of the `update_seq` as viewed by the
670 # changes feed? By excepting `BadRequest` here, we prevent the
671@@ -900,21 +1058,29 @@
672 except (ResponseNotReady, BadRequest):
673 pass
674
675- def iter_actionable_fragile(self, connected, monitor=False):
676- """
677- Yield doc for each fragile file that this node might be able to fix.
678-
679- To be "actionable", this machine must have at least one currently
680- connected FileStore (drive) that does *not* already contain a copy of
681- the fragile file.
682- """
683- assert isinstance(connected, frozenset)
684- for doc in self.iter_fragile(monitor):
685- stored = frozenset(get_dict(doc, 'stored'))
686- if (connected - stored):
687- yield (doc, stored)
688-
689- def reclaim(self, fs, threshold=RECLAIM_BYTES):
690+ def iter_preempt_files(self):
691+ kw = {
692+ 'limit': 100,
693+ 'descending': True,
694+ }
695+ rows = self.db.view('file', 'preempt', **kw)['rows']
696+ if not rows:
697+ return
698+ ids = [r['id'] for r in rows]
699+ log.info('Considering %d files for preemptive copy increasing', len(ids))
700+ random.shuffle(ids)
701+ for _id in ids:
702+ try:
703+ doc = self.db.get(_id)
704+ copies = get_copies(doc)
705+ if copies == 3:
706+ yield doc
707+ else:
708+ log.info('Now at copies=%d, skipping %s', copies, _id)
709+ except NotFound:
710+ log.warning('preempt doc NotFound for %s', _id)
711+
712+ def reclaim(self, fs, threshold=MAX_BYTES_FREE):
713 count = 0
714 size = 0
715 t = TimeDelta()
716@@ -936,7 +1102,7 @@
717 t.log('reclaim %s in %r', count_and_size(count, size), fs)
718 return (count, size)
719
720- def reclaim_all(self, threshold=RECLAIM_BYTES):
721+ def reclaim_all(self, threshold=MAX_BYTES_FREE):
722 try:
723 count = 0
724 size = 0
725
726=== modified file 'dmedia/tests/test_core.py'
727--- dmedia/tests/test_core.py 2013-10-24 02:59:18 +0000
728+++ dmedia/tests/test_core.py 2013-11-21 03:22:50 +0000
729@@ -230,6 +230,150 @@
730 })
731
732
733+class TestVigilanceMocked(TestCase):
734+ def test_up_rank(self):
735+ class Mocked(core.Vigilance):
736+ def __init__(self, local, remote):
737+ self.local = frozenset(local)
738+ self.remote = frozenset(remote)
739+ self._calls = []
740+
741+ def up_rank_by_verifying(self, doc, downgraded):
742+ self._calls.extend(('verify', doc, downgraded))
743+ return doc
744+
745+ def up_rank_by_copying(self, doc, free, threshold):
746+ self._calls.extend(('copy', doc, free, threshold))
747+ return doc
748+
749+ def up_rank_by_downloading(self, doc, remote, threshold):
750+ self._calls.extend(('download', doc, remote, threshold))
751+ return doc
752+
753+ local = tuple(random_id() for i in range(2))
754+ remote = tuple(random_id() for i in range(2))
755+ mocked = Mocked(local, remote)
756+
757+ # Verify, one local:
758+ doc = {
759+ 'stored': {
760+ local[0]: {'copies': 0},
761+ },
762+ }
763+ self.assertIs(mocked.up_rank(doc, 17), doc)
764+ self.assertEqual(mocked._calls,
765+ ['verify', doc, {local[0]}]
766+ )
767+
768+ # Verify, two local:
769+ doc = {
770+ 'stored': {
771+ local[0]: {'copies': 0},
772+ local[1]: {'copies': 1},
773+ },
774+ }
775+ mocked._calls.clear()
776+ self.assertIs(mocked.up_rank(doc, 17), doc)
777+ self.assertEqual(mocked._calls,
778+ ['verify', doc, {local[0]}]
779+ )
780+
781+ # Verify, one local, one remote:
782+ doc = {
783+ 'stored': {
784+ local[0]: {'copies': 0},
785+ remote[0]: {'copies': 1},
786+ },
787+ }
788+ mocked._calls.clear()
789+ self.assertIs(mocked.up_rank(doc, 17), doc)
790+ self.assertEqual(mocked._calls,
791+ ['verify', doc, {local[0]}]
792+ )
793+
794+ # Copy, one local, one remote:
795+ doc = {
796+ 'stored': {
797+ local[0]: {'copies': 1},
798+ remote[0]: {'copies': 1},
799+ },
800+ }
801+ mocked._calls.clear()
802+ self.assertIs(mocked.up_rank(doc, 17), doc)
803+ self.assertEqual(mocked._calls,
804+ ['copy', doc, {local[1]}, 17]
805+ )
806+
807+ # Copy, two local, one remote:
808+ doc = {
809+ 'stored': {
810+ local[0]: {'copies': 1},
811+ local[1]: {'copies': 1},
812+ remote[0]: {'copies': 1},
813+ },
814+ }
815+ mocked._calls.clear()
816+ self.assertIsNone(mocked.up_rank(doc, 17))
817+ self.assertEqual(mocked._calls, [])
818+
819+ # Download, one remote:
820+ doc = {
821+ 'stored': {
822+ remote[0]: {'copies': 0},
823+ },
824+ }
825+ mocked._calls.clear()
826+ self.assertIs(mocked.up_rank(doc, 17), doc)
827+ self.assertEqual(mocked._calls,
828+ ['download', doc, {remote[0]}, 17]
829+ )
830+
831+ # Download, two remote:
832+ doc = {
833+ 'stored': {
834+ remote[0]: {'copies': 0},
835+ remote[1]: {'copies': 1},
836+ },
837+ }
838+ mocked._calls.clear()
839+ self.assertIs(mocked.up_rank(doc, 17), doc)
840+ self.assertEqual(mocked._calls,
841+ ['download', doc, set(remote), 17]
842+ )
843+
844+ # Available in neither local nor remote:
845+ doc = {
846+ 'stored': {
847+ random_id(): {'copies': 0},
848+ random_id(): {'copies': 1},
849+ },
850+ }
851+ mocked._calls.clear()
852+ self.assertIsNone(mocked.up_rank(doc, 17))
853+ self.assertEqual(mocked._calls, [])
854+
855+ # Empty doc['stored']:
856+ doc = {'stored': {}}
857+ mocked._calls.clear()
858+ self.assertIsNone(mocked.up_rank(doc, 17))
859+ self.assertEqual(mocked._calls, [])
860+
861+
862+class TestVigilance(CouchCase):
863+ def test_init(self):
864+ db = util.get_db(self.env, True)
865+ ms = MetaStore(db)
866+ inst = core.Vigilance(ms, None)
867+ self.assertIs(inst.ms, ms)
868+ self.assertIsInstance(inst.stores, LocalStores)
869+ self.assertIsInstance(inst.local, frozenset)
870+ self.assertEqual(inst.local, frozenset())
871+ self.assertIsInstance(inst.remote, frozenset)
872+ self.assertEqual(inst.remote, frozenset())
873+ self.assertEqual(inst.clients, {})
874+ self.assertEqual(inst.store_to_peer, {})
875+
876+
877 class TestTaskQueue(TestCase):
878 def test_init(self):
879 tq = core.TaskQueue()
880
881=== modified file 'dmedia/tests/test_local.py'
882--- dmedia/tests/test_local.py 2013-08-25 20:30:25 +0000
883+++ dmedia/tests/test_local.py 2013-11-21 03:22:50 +0000
884@@ -181,6 +181,43 @@
885 self.assertEqual(inst.fast, set())
886 self.assertEqual(inst.slow, set())
887
888+ def test_filter_by_avail(self):
889+ # FIXME: Improve this once we can mock FileStore.statvfs()
890+ inst = local.LocalStores()
891+
892+ # Empty
893+ self.assertEqual(inst.filter_by_avail({}, 1, 1, 1), [])
894+ free = {random_id(), random_id()}
895+ self.assertEqual(inst.filter_by_avail(free, 1, 1, 1), [])
896+
897+ # One FileStore
898+ fs1 = TempFileStore()
899+ inst.add(fs1)
900+ self.assertEqual(inst.filter_by_avail({}, 1, 1, 1), [])
901+ free = {random_id(), random_id()}
902+ self.assertEqual(inst.filter_by_avail(free, 1, 1, 1), [])
903+ free = {fs1.id, random_id(), random_id()}
904+ self.assertEqual(inst.filter_by_avail(free, 1, 1, 1), [fs1])
905+
906+ # Two FileStore
907+ fs2 = TempFileStore()
908+ inst.add(fs2)
909+ self.assertEqual(inst.filter_by_avail({}, 1, 1, 1), [])
910+ free = {random_id(), random_id()}
911+ self.assertEqual(inst.filter_by_avail(free, 1, 1, 1), [])
912+ free = {fs1.id, random_id(), random_id()}
913+ self.assertEqual(inst.filter_by_avail(free, 1, 1, 1), [fs1])
914+ free = {fs2.id, random_id(), random_id()}
915+ self.assertEqual(inst.filter_by_avail(free, 1, 1, 1), [fs2])
916+
917+ def test_find_dst_store(self):
918+ # FIXME: Improve this once we can mock FileStore.statvfs()
919+ inst = local.LocalStores()
920+ self.assertIsNone(inst.find_dst_store(1, 1))
921+ fs = TempFileStore()
922+ inst.add(fs)
923+ self.assertIs(inst.find_dst_store(1, 1), fs)
924+
925 def test_local_stores(self):
926 fs1 = TempFileStore(copies=1)
927 fs2 = TempFileStore(copies=0)
928
929=== modified file 'dmedia/tests/test_metastore.py'
930--- dmedia/tests/test_metastore.py 2013-10-30 03:26:10 +0000
931+++ dmedia/tests/test_metastore.py 2013-11-21 03:22:50 +0000
932@@ -44,11 +44,91 @@
933 from dmedia import util, schema, metastore
934 from dmedia.metastore import create_stored, get_mtime
935 from dmedia.constants import TYPE_ERROR
936+from dmedia.units import bytes10
937
938
939 random = SystemRandom()
940
941
942+def doc_id(doc):
943+ """
944+ Used as key function for sorted by doc['_id'].
945+ """
946+ return doc['_id']
947+
948+
949+def build_stored_at_rank(rank, store_ids):
950+ """
951+ Build doc['stored'] for a specific rank.
952+
953+ For example:
954+
955+ >>> store_ids = (
956+ ... '333333333333333333333333',
957+ ... 'AAAAAAAAAAAAAAAAAAAAAAAA',
958+ ... 'YYYYYYYYYYYYYYYYYYYYYYYY',
959+ ... )
960+ >>> build_stored_at_rank(0, store_ids)
961+ {}
962+ >>> build_stored_at_rank(1, store_ids)
963+ {'333333333333333333333333': {'copies': 0}}
964+ >>> build_stored_at_rank(2, store_ids)
965+ {'333333333333333333333333': {'copies': 1}}
966+
967+ """
968+ assert isinstance(rank, int)
969+ assert 0 <= rank <= 6
970+ assert isinstance(store_ids, tuple)
971+ assert len(store_ids) == 3
972+ for _id in store_ids:
973+ assert isinstance(_id, str) and len(_id) == 24 and isdb32(_id)
974+ if rank == 0:
975+ return {}
976+ if rank == 1:
977+ return {
978+ store_ids[0]: {'copies': 0},
979+ }
980+ if rank == 2:
981+ return {
982+ store_ids[0]: {'copies': 1},
983+ }
984+ if rank == 3:
985+ return {
986+ store_ids[0]: {'copies': 1},
987+ store_ids[1]: {'copies': 0},
988+ }
989+ if rank == 4:
990+ return {
991+ store_ids[0]: {'copies': 1},
992+ store_ids[1]: {'copies': 1},
993+ }
994+ if rank == 5:
995+ return {
996+ store_ids[0]: {'copies': 1},
997+ store_ids[1]: {'copies': 1},
998+ store_ids[2]: {'copies': 0},
999+ }
1000+ if rank == 6:
1001+ return {
1002+ store_ids[0]: {'copies': 1},
1003+ store_ids[1]: {'copies': 1},
1004+ store_ids[2]: {'copies': 1},
1005+ }
1006+ raise Exception('should not have reached this point')
1007+
1008+
1009+def build_file_at_rank(_id, rank, store_ids):
1010+ assert isinstance(_id, str) and len(_id) == 48 and isdb32(_id)
1011+ doc = {
1012+ '_id': _id,
1013+ 'type': 'dmedia/file',
1014+ 'origin': 'user',
1015+ 'stored': build_stored_at_rank(rank, store_ids),
1016+ }
1017+ assert metastore.get_rank(doc) == rank
1018+ return doc
1019+
1020+
1021 def create_random_file(fs, db):
1022 tmp_fp = fs.allocate_tmp()
1023 ch = write_random(tmp_fp)
1024@@ -196,7 +276,28 @@
1025 self.assertTrue(
1026 parent // 4 <= metastore.VERIFY_BY_VERIFIED <= parent // 2
1027 )
1028- self.assertGreater(metastore.VERIFY_BY_VERIFIED, metastore.VERIFY_BY_MTIME)
1029+ self.assertGreater(metastore.VERIFY_BY_VERIFIED, metastore.VERIFY_BY_MTIME)
1030+
1031+ def test_GB(self):
1032+ self.assertIsInstance(metastore.GB, int)
1033+ self.assertEqual(metastore.GB, 10 ** 9)
1034+ self.assertEqual(metastore.GB, 1000 ** 3)
1035+ self.assertEqual(bytes10(metastore.GB), '1 GB')
1036+
1037+ def test_MIN_BYTES_FREE(self):
1038+ self.assertIsInstance(metastore.MIN_BYTES_FREE, int)
1039+ self.assertGreaterEqual(metastore.MIN_BYTES_FREE, metastore.GB)
1040+ self.assertEqual(metastore.MIN_BYTES_FREE % metastore.GB, 0)
1041+ self.assertEqual(bytes10(metastore.MIN_BYTES_FREE), '16 GB')
1042+
1043+ def test_MAX_BYTES_FREE(self):
1044+ self.assertIsInstance(metastore.MAX_BYTES_FREE, int)
1045+ self.assertGreaterEqual(metastore.MAX_BYTES_FREE, metastore.GB)
1046+ self.assertEqual(metastore.MAX_BYTES_FREE % metastore.GB, 0)
1047+ self.assertGreaterEqual(metastore.MAX_BYTES_FREE,
1048+ 2 * metastore.MIN_BYTES_FREE
1049+ )
1050+ self.assertEqual(bytes10(metastore.MAX_BYTES_FREE), '64 GB')
1051
1052
1053 class TestFunctions(TestCase):
1054@@ -243,6 +344,123 @@
1055 self.assertEqual(doc, {'foo': {'bar': 0, 'baz': 1}})
1056 self.assertIs(doc['foo'], ret)
1057
1058+ def test_get_int(self):
1059+ # Bad `d` type:
1060+ bad = [random_id(), random_id()]
1061+ with self.assertRaises(TypeError) as cm:
1062+ metastore.get_int(bad, random_id())
1063+ self.assertEqual(
1064+ str(cm.exception),
1065+ TYPE_ERROR.format('d', dict, list, bad)
1066+ )
1067+
1068+ # Bad `key` type:
1069+ bad = random.randint(0, 1000)
1070+ with self.assertRaises(TypeError) as cm:
1071+ metastore.get_int({}, bad)
1072+ self.assertEqual(
1073+ str(cm.exception),
1074+ TYPE_ERROR.format('key', str, int, bad)
1075+ )
1076+
1077+ # Empty:
1078+ doc = {}
1079+ ret = metastore.get_int(doc, 'foo')
1080+ self.assertIsInstance(ret, int)
1081+ self.assertEqual(ret, 0)
1082+ self.assertEqual(doc, {'foo': 0})
1083+ self.assertIs(doc['foo'], ret)
1084+
1085+ # Wrong type:
1086+ doc = {'foo': '17'}
1087+ ret = metastore.get_int(doc, 'foo')
1088+ self.assertIsInstance(ret, int)
1089+ self.assertEqual(ret, 0)
1090+ self.assertEqual(doc, {'foo': 0})
1091+ self.assertIs(doc['foo'], ret)
1092+
1093+ # Trickier wrong type:
1094+ doc = {'foo': 17.0}
1095+ ret = metastore.get_int(doc, 'foo')
1096+ self.assertIsInstance(ret, int)
1097+ self.assertEqual(ret, 0)
1098+ self.assertEqual(doc, {'foo': 0})
1099+ self.assertIs(doc['foo'], ret)
1100+
1101+ # Bad Value:
1102+ doc = {'foo': -17}
1103+ ret = metastore.get_int(doc, 'foo')
1104+ self.assertIsInstance(ret, int)
1105+ self.assertEqual(ret, 0)
1106+ self.assertEqual(doc, {'foo': 0})
1107+ self.assertIs(doc['foo'], ret)
1108+
1109+ # Another bad Value:
1110+ doc = {'foo': -1}
1111+ ret = metastore.get_int(doc, 'foo')
1112+ self.assertIsInstance(ret, int)
1113+ self.assertEqual(ret, 0)
1114+ self.assertEqual(doc, {'foo': 0})
1115+ self.assertIs(doc['foo'], ret)
1116+
1117+ # All good:
1118+ value = 17
1119+ doc = {'foo': value}
1120+ self.assertIs(metastore.get_int(doc, 'foo'), value)
1121+ self.assertEqual(doc, {'foo': 17})
1122+ self.assertIs(doc['foo'], value)
1123+
1124+ # Also all good:
1125+ value = 0
1126+ doc = {'foo': value}
1127+ self.assertIs(metastore.get_int(doc, 'foo'), value)
1128+ self.assertEqual(doc, {'foo': 0})
1129+ self.assertIs(doc['foo'], value)
1130+
1131+ def test_get_rank(self):
1132+ file_id = random_file_id()
1133+ store_ids = tuple(random_id() for i in range(3))
1134+ for rank in range(7):
1135+ doc = build_file_at_rank(file_id, rank, store_ids)
1136+ self.assertEqual(metastore.get_rank(doc), rank)
1137+
1138+ # Empty doc
1139+ doc = {}
1140+ self.assertEqual(metastore.get_rank(doc), 0)
1141+ self.assertEqual(doc, {'stored': {}})
1142+
1143+ # Empty doc['stored']
1144+ doc = {'stored': {}}
1145+ self.assertEqual(metastore.get_rank(doc), 0)
1146+ self.assertEqual(doc, {'stored': {}})
1147+
1148+ # All kinds of broken:
1149+ store_ids = tuple(random_id() for i in range(6))
1150+ doc = {
1151+ 'stored': {
1152+ store_ids[0]: {'copies': 1},
1153+ store_ids[1]: {'copies': '17'},
1154+ store_ids[2]: {'copies': -18},
1155+ store_ids[3]: {},
1156+ store_ids[4]: 'hello',
1157+ store_ids[5]: 3,
1158+ random_id(10): {'copies': 1},
1159+ ('a' * 24): {'copies': 1},
1160+ 42: {'copies': 1},
1161+ },
1162+ }
1163+ self.assertEqual(metastore.get_rank(doc), 4)
1164+ self.assertEqual(doc, {
1165+ 'stored': {
1166+ store_ids[0]: {'copies': 1},
1167+ store_ids[1]: {'copies': 0},
1168+ store_ids[2]: {'copies': 0},
1169+ store_ids[3]: {'copies': 0},
1170+ store_ids[4]: {'copies': 0},
1171+ store_ids[5]: {'copies': 0},
1172+ },
1173+ })
1174+
1175 def test_get_mtime(self):
1176 fs = TempFileStore()
1177 _id = random_file_id()
1178@@ -2791,6 +3009,7 @@
1179
1180 doc = db.get(fs.id)
1181 self.assertTrue(doc['_rev'].startswith('2-'))
1182+ self.assertIn('bytes_avail', doc)
1183 atime = doc.get('atime')
1184 self.assertIsInstance(atime, int)
1185 self.assertLessEqual(atime, int(time.time()))
1186@@ -3643,59 +3862,93 @@
1187 },
1188 })
1189
1190- def test_iter_fragile(self):
1191+ def test_iter_files_at_rank(self):
1192 db = util.get_db(self.env, True)
1193 ms = metastore.MetaStore(db)
1194
1195+ # Bad rank type:
1196+ with self.assertRaises(TypeError) as cm:
1197+ list(ms.iter_files_at_rank(1.0))
1198+ self.assertEqual(str(cm.exception),
1199+ TYPE_ERROR.format('rank', int, float, 1.0)
1200+ )
1201+
1202+ # Bad rank value:
1203+ with self.assertRaises(ValueError) as cm:
1204+ list(ms.iter_files_at_rank(-1))
1205+ self.assertEqual(str(cm.exception), 'Need 0 <= rank <= 5; got -1')
1206+ with self.assertRaises(ValueError) as cm:
1207+ list(ms.iter_files_at_rank(6))
1208+ self.assertEqual(str(cm.exception), 'Need 0 <= rank <= 5; got 6')
1209+
1210 # Test when no files are in the library:
1211- self.assertEqual(list(ms.iter_fragile()), [])
1212+ self.assertEqual(list(ms.iter_files_at_rank(0)), [])
1213+ self.assertEqual(list(ms.iter_files_at_rank(1)), [])
1214+ self.assertEqual(list(ms.iter_files_at_rank(2)), [])
1215+ self.assertEqual(list(ms.iter_files_at_rank(3)), [])
1216+ self.assertEqual(list(ms.iter_files_at_rank(4)), [])
1217+ self.assertEqual(list(ms.iter_files_at_rank(5)), [])
1218
1219- # Create rank=(0 through 6) test data:
1220- ids = tuple(random_file_id() for i in range(7))
1221+ # Create rank=(0 through 5) test data:
1222 stores = tuple(random_id() for i in range(3))
1223- docs = [
1224+ docs_0 = [
1225 {
1226- '_id': ids[0],
1227+ '_id': random_file_id(),
1228 'type': 'dmedia/file',
1229 'origin': 'user',
1230 'stored': {},
1231- },
1232+ }
1233+ for i in range(100)
1234+ ]
1235+ docs_1 = [
1236 {
1237- '_id': ids[1],
1238+ '_id': random_file_id(),
1239 'type': 'dmedia/file',
1240 'origin': 'user',
1241 'stored': {
1242 stores[0]: {'copies': 0},
1243 },
1244- },
1245+ }
1246+ for i in range(101)
1247+ ]
1248+ docs_2 = [
1249 {
1250- '_id': ids[2],
1251+ '_id': random_file_id(),
1252 'type': 'dmedia/file',
1253 'origin': 'user',
1254 'stored': {
1255 stores[0]: {'copies': 1},
1256 },
1257- },
1258+ }
1259+ for i in range(102)
1260+ ]
1261+ docs_3 = [
1262 {
1263- '_id': ids[3],
1264+ '_id': random_file_id(),
1265 'type': 'dmedia/file',
1266 'origin': 'user',
1267 'stored': {
1268 stores[0]: {'copies': 1},
1269 stores[1]: {'copies': 0},
1270 },
1271- },
1272+ }
1273+ for i in range(103)
1274+ ]
1275+ docs_4 = [
1276 {
1277- '_id': ids[4],
1278+ '_id': random_file_id(),
1279 'type': 'dmedia/file',
1280 'origin': 'user',
1281 'stored': {
1282 stores[0]: {'copies': 1},
1283 stores[1]: {'copies': 1},
1284 },
1285- },
1286+ }
1287+ for i in range(104)
1288+ ]
1289+ docs_5 = [
1290 {
1291- '_id': ids[5],
1292+ '_id': random_file_id(),
1293 'type': 'dmedia/file',
1294 'origin': 'user',
1295 'stored': {
1296@@ -3703,9 +3956,181 @@
1297 stores[1]: {'copies': 1},
1298 stores[2]: {'copies': 0},
1299 },
1300- },
1301+ }
1302+ for i in range(105)
1303+ ]
1304+ docs = []
1305+ doc_groups = (docs_0, docs_1, docs_2, docs_3, docs_4, docs_5)
1306+ for docs_n in doc_groups:
1307+ docs.extend(docs_n)
1308+ docs_n.sort(key=doc_id)
1309+ self.assertEqual(len(docs), 615)
1310+ db.save_many(docs)
1311+
1312+ # Test that for each rank, we get the expected docs and no duplicates:
1313+ for (n, docs_n) in enumerate(doc_groups):
1314+ result = list(ms.iter_files_at_rank(n))
1315+ self.assertEqual(len(result), 100 + n)
1316+ self.assertNotEqual(result, docs_n) # Due to random.shuffle()
1317+ self.assertEqual(sorted(result, key=doc_id), docs_n)
1318+
1319+ # Similar to above, except this time we're modifying the docs as they're
1320+ # yielded so they're bumped up to rank=6 in the file/rank view:
1321+ self.assertEqual(len(doc_groups), 6)
1322+ self.assertEqual(db.view('file', 'rank', key=6)['rows'], [])
1323+ for (n, docs_n) in enumerate(doc_groups):
1324+ result = []
1325+ for doc in ms.iter_files_at_rank(n):
1326+ result.append(doc)
1327+ new = deepcopy(doc)
1328+ new['stored'] = {
1329+ stores[0]: {'copies': 1},
1330+ stores[1]: {'copies': 1},
1331+ stores[2]: {'copies': 1},
1332+ }
1333+ db.save(new)
1334+ self.assertEqual(len(result), 100 + n)
1335+ self.assertNotEqual(result, docs_n) # Due to random.shuffle()
1336+ self.assertEqual(sorted(result, key=doc_id), docs_n)
1337+ self.assertEqual(list(ms.iter_files_at_rank(n)), [])
1338+
1339+ # Double check that rank 0 through 5 are still returning no docs:
1340+ self.assertEqual(list(ms.iter_files_at_rank(0)), [])
1341+ self.assertEqual(list(ms.iter_files_at_rank(1)), [])
1342+ self.assertEqual(list(ms.iter_files_at_rank(2)), [])
1343+ self.assertEqual(list(ms.iter_files_at_rank(3)), [])
1344+ self.assertEqual(list(ms.iter_files_at_rank(4)), [])
1345+ self.assertEqual(list(ms.iter_files_at_rank(5)), [])
1346+
1347+ # And check that all the docs are still at rank=6 and _rev=2:
1348+ ids = sorted(d['_id'] for d in docs)
1349+ rows = db.view('file', 'rank', key=6)['rows']
1350+ self.assertEqual(len(rows), 615)
1351+ self.assertEqual([r['id'] for r in rows], ids)
1352+ for doc in db.get_many(ids):
1353+ self.assertEqual(doc['_rev'][:2], '2-')
1354+
1355+ def test_iter_files_at_rank_2(self):
1356+ """
1357+ Ensure that get_rank() is used to filter out greater than current rank.
1358+ """
1359+ db = util.get_db(self.env, True)
1360+ ms = metastore.MetaStore(db)
1361+ for rank in range(6):
1362+ ids = tuple(random_file_id() for i in range(50))
1363+ store_ids = tuple(random_id() for i in range(3))
1364+ docs = [build_file_at_rank(_id, rank, store_ids) for _id in ids]
1365+ db.save_many(docs)
1366+ dmap = dict((d['_id'], d) for d in docs)
1367+ docs.sort(key=doc_id)
1368+
1369+ # Test with no modification:
1370+ result = list(ms.iter_files_at_rank(rank))
1371+ self.assertEqual(len(result), 50)
1372+ self.assertNotEqual(result, docs) # Due to random.shuffle()
1373+ self.assertEqual(sorted(result, key=doc_id), docs)
1374+
1375+ # Adjust 17 files to rank+1 after the first doc is yielded:
1376+ include = None
1377+ result = []
1378+ for doc in ms.iter_files_at_rank(rank):
1379+ result.append(doc)
1380+ if include is None:
1381+ include = {doc['_id']}
1382+ remaining = set(ids) - include
1383+ remove = random.sample(remaining, 17)
1384+ include.update(remaining - set(remove))
1385+ rdocs = [dmap[_id] for _id in remove]
1386+ for rdoc in rdocs:
1387+ rdoc['stored'] = build_stored_at_rank(rank + 1, store_ids)
1388+ self.assertEqual(metastore.get_rank(rdoc), rank + 1)
1389+ db.save_many(rdocs)
1390+ expected = [dmap[_id] for _id in include]
1391+ expected.sort(key=doc_id)
1392+ self.assertEqual(len(expected), 33)
1393+ self.assertEqual(len(result), 33)
1394+ self.assertNotEqual(result, expected) # Due to random.shuffle()
1395+ self.assertEqual(sorted(result, key=doc_id), expected)
1396+
1397+ # Now check rank+1, unless we're at rank=5:
1398+ if rank < 5:
1399+ rdocs.sort(key=doc_id)
1400+ result = list(ms.iter_files_at_rank(rank + 1))
1401+ self.assertEqual(len(result), 17)
1402+ self.assertNotEqual(result, rdocs) # Due to random.shuffle()
1403+ self.assertEqual(sorted(result, key=doc_id), rdocs)
1404+ # Clean up for rank+1:
1405+ for rdoc in rdocs:
1406+ rdoc['_deleted'] = True
1407+ db.save_many(rdocs)
1408+ self.assertEqual(list(ms.iter_files_at_rank(rank + 1)), [])
1409+
1410+ def test_iter_fragile_files(self):
1411+ db = util.get_db(self.env, True)
1412+
1413+ # Test with a mocked MetaStore.iter_files_at_rank():
1414+ class Mocked(metastore.MetaStore):
1415+ def __init__(self, db, log_db=None):
1416+ super().__init__(db, log_db)
1417+ self._calls = []
1418+ self._ranks = tuple(
1419+ tuple(random_id() for i in range(25))
1420+ for rank in range(6)
1421+ )
1422+
1423+ def iter_files_at_rank(self, rank):
1424+ assert isinstance(rank, int)
1425+ assert 0 <= rank <= 5
1426+ self._calls.append(rank)
1427+ for _id in self._ranks[rank]:
1428+ yield _id
1429+
1430+ mocked = Mocked(db)
1431+ expected = []
1432+ for ids in mocked._ranks:
1433+ expected.extend(ids)
1434+ self.assertEqual(list(mocked.iter_fragile_files()), expected)
1435+ self.assertEqual(mocked._calls, [0, 1, 2, 3, 4, 5])
1436+
1437+ # Now do a live test:
1438+ ms = metastore.MetaStore(db)
1439+ self.assertEqual(list(ms.iter_fragile_files()), [])
1440+ store_ids = tuple(random_id() for i in range(3))
1441+ docs = [
1442+ build_file_at_rank(random_file_id(), rank, store_ids)
1443+ for rank in range(7)
1444+ ]
1445+ db.save_many(docs)
1446+ self.assertEqual(list(ms.iter_fragile_files()), docs[:-1])
1447+ for doc in docs:
1448+ doc['_deleted'] = True
1449+ db.save_many(docs)
1450+ self.assertEqual(list(ms.iter_fragile_files()), [])
1451+
1452+ # Test pushing up through ranks:
1453+ docs = [
1454+ build_file_at_rank(random_file_id(), 0, store_ids)
1455+ for i in range(100)
1456+ ]
1457+ db.save_many(docs)
1458+ docs.sort(key=doc_id)
1459+ for rank in range(6):
1460+ result = list(ms.iter_fragile_files())
1461+ self.assertEqual(len(result), 100)
1462+ self.assertNotEqual(result, docs) # Due to random.shuffle()
1463+ self.assertEqual(sorted(result, key=doc_id), docs)
1464+ for doc in docs:
1465+ doc['stored'] = build_stored_at_rank(rank + 1, store_ids)
1466+ db.save_many(docs)
1467+
1468+ def test_wait_for_fragile_files(self):
1469+ db = util.get_db(self.env, True)
1470+ ms = metastore.MetaStore(db)
1471+
1472+ stores = tuple(random_id() for i in range(3))
1473+ docs = [
1474 {
1475- '_id': ids[6],
1476+ '_id': random_file_id(),
1477 'type': 'dmedia/file',
1478 'origin': 'user',
1479 'stored': {
1480@@ -3713,104 +4138,57 @@
1481 stores[1]: {'copies': 1},
1482 stores[2]: {'copies': 1},
1483 },
1484- },
1485+ }
1486+ for i in range(4)
1487 ]
1488 db.save_many(docs)
1489-
1490- # We should get docs[0:6]:
1491- self.assertEqual(list(ms.iter_fragile()), docs[0:-1])
1492-
1493- # Docs should not be changed:
1494- self.assertEqual(db.get_many(ids), docs)
1495-
1496- def test_iter_actionable_fragile(self):
1497+ last_seq = db.get()['update_seq']
1498+ for doc in docs:
1499+ del doc['stored'][stores[0]]
1500+ db.save(doc)
1501+ result = ms.wait_for_fragile_files(last_seq)
1502+ self.assertEqual(result, {
1503+ 'last_seq': last_seq + 1,
1504+ 'results': [
1505+ {
1506+ 'changes': [{'rev': doc['_rev']}],
1507+ 'doc': doc,
1508+ 'id': doc['_id'],
1509+ 'seq': last_seq + 1,
1510+ }
1511+ ],
1512+ })
1513+ last_seq = result['last_seq']
1514+
1515+ def test_iter_preempt_files(self):
1516 db = util.get_db(self.env, True)
1517 ms = metastore.MetaStore(db)
1518
1519- store_id1 = random_id()
1520- store_id2 = random_id()
1521- store_id3 = random_id()
1522- store_id4 = random_id()
1523- empty = frozenset()
1524- one = frozenset([store_id1])
1525- two = frozenset([store_id1, store_id2])
1526- three = frozenset([store_id1, store_id2, store_id3])
1527-
1528- id1 = random_file_id()
1529- id2 = random_file_id()
1530- id3 = random_file_id()
1531- doc1 = {
1532- '_id': id1,
1533- 'type': 'dmedia/file',
1534- 'origin': 'user',
1535- 'stored': {
1536- store_id1: {'copies': 0},
1537- },
1538- }
1539- doc2 = {
1540- '_id': id2,
1541- 'type': 'dmedia/file',
1542- 'origin': 'user',
1543- 'stored': {
1544- store_id1: {'copies': 0},
1545- store_id4: {'copies': 1},
1546- },
1547- }
1548- doc3 = {
1549- '_id': id3,
1550- 'type': 'dmedia/file',
1551- 'origin': 'user',
1552- 'stored': {
1553- store_id1: {'copies': 1},
1554- store_id2: {'copies': 1},
1555- },
1556- }
1557-
1558- # Test when no files are in the library:
1559- self.assertEqual(list(ms.iter_actionable_fragile(empty)), [])
1560- self.assertEqual(list(ms.iter_actionable_fragile(one)), [])
1561- self.assertEqual(list(ms.iter_actionable_fragile(two)), [])
1562- self.assertEqual(list(ms.iter_actionable_fragile(three)), [])
1563-
1564- # All 3 docs should be included:
1565- db.save_many([doc1, doc2, doc3])
1566- self.assertEqual(list(ms.iter_actionable_fragile(three)), [
1567- (doc1, set([store_id1])),
1568- (doc2, set([store_id1, store_id4])),
1569- (doc3, set([store_id1, store_id2])),
1570- ])
1571-
1572- # If only store_id1, store_id2 are connected, doc3 shouldn't be
1573- # actionable:
1574- self.assertEqual(list(ms.iter_actionable_fragile(two)), [
1575- (doc1, set([store_id1])),
1576- (doc2, set([store_id1, store_id4])),
1577- ])
1578-
1579- # All files have a copy in store_id1, so nothing should be returned:
1580- self.assertEqual(list(ms.iter_actionable_fragile(one)), [])
1581-
1582- # If doc2 was only stored on a non-connected store:
1583- doc1['stored'] = {
1584- store_id4: {'copies': 1},
1585- }
1586- db.save(doc1)
1587- self.assertEqual(list(ms.iter_actionable_fragile(one)), [
1588- (doc1, set([store_id4]))
1589- ])
1590-
1591- # If doc2 has sufficent durablity, it shouldn't be included, even though
1592- # there is a free drive where a copy could be created:
1593- doc2['stored'] = {
1594- store_id1: {'copies': 1},
1595- store_id2: {'copies': 1},
1596- store_id4: {'copies': 1},
1597- }
1598- db.save(doc2)
1599- self.assertEqual(list(ms.iter_actionable_fragile(three)), [
1600- (doc1, set([store_id4])),
1601- (doc3, set([store_id1, store_id2])),
1602- ])
1603+ # When empty
1604+ self.assertEqual(list(ms.iter_preempt_files()), [])
1605+
1606+ # With live data:
1607+ store_ids = tuple(random_id() for i in range(3))
1608+ docs = [
1609+ build_file_at_rank(random_file_id(), 6, store_ids)
1610+ for i in range(107)
1611+ ]
1612+ base = int(time.time())
1613+ for (i, doc) in enumerate(docs):
1614+ doc['atime'] = base - i
1615+ db.save_many(docs)
1616+ expected = docs[0:100]
1617+ result = list(ms.iter_preempt_files())
1618+ self.assertEqual(len(result), 100)
1619+ self.assertNotEqual(result, expected) # Due to random.shuffle()
1620+ result.sort(key=lambda d: d['atime'], reverse=True)
1621+ self.assertEqual(result, expected)
1622+
1623+ # Make sure files are excluded when durability isn't 3:
1624+ for doc in docs:
1625+ doc['stored'] = build_stored_at_rank(5, store_ids)
1626+ db.save_many(docs)
1627+ self.assertEqual(list(ms.iter_preempt_files()), [])
1628
1629 def test_reclaim(self):
1630 # FIXME: Till we have a nice way of mocking FileStore.statvfs(), this is
1631
1632=== modified file 'dmedia/tests/test_views.py'
1633--- dmedia/tests/test_views.py 2013-10-12 07:19:16 +0000
1634+++ dmedia/tests/test_views.py 2013-11-21 03:22:50 +0000
1635@@ -1253,6 +1253,155 @@
1636 {'rows': [], 'offset': 0, 'total_rows': 0},
1637 )
1638
1639+ ######################################################################
1640+ # 3rd, test assumptions about how "startkey_docid" and "key" interact:
1641+ db = Database('baz', self.env)
1642+ db.put(None)
1643+ design = self.build_view('rank')
1644+ db.save(design)
1645+
1646+ # Create rank=(0 through 6) test data:
1647+ ranks = tuple(
1648+ tuple(random_file_id() for j in range(17 + i))
1649+ for i in range(7)
1650+ )
1651+ sorted_ranks = tuple(
1652+ tuple(sorted(rank_n)) for rank_n in ranks
1653+ )
1654+ flattened = []
1655+ for (n, rank_n) in enumerate(sorted_ranks):
1656+ flattened.extend((n, _id) for _id in rank_n)
1657+ flattened = tuple(flattened)
1658+ self.assertEqual(len(flattened), 140)
1659+
1660+ stores = tuple(random_id() for i in range(3))
1661+ docs = []
1662+ docs.extend(
1663+ {
1664+ '_id': _id,
1665+ 'type': 'dmedia/file',
1666+ 'origin': 'user',
1667+ 'stored': {},
1668+ }
1669+ for _id in ranks[0]
1670+ )
1671+ docs.extend(
1672+ {
1673+ '_id': _id,
1674+ 'type': 'dmedia/file',
1675+ 'origin': 'user',
1676+ 'stored': {
1677+ stores[0]: {'copies': 0},
1678+ },
1679+ }
1680+ for _id in ranks[1]
1681+ )
1682+ docs.extend(
1683+ {
1684+ '_id': _id,
1685+ 'type': 'dmedia/file',
1686+ 'origin': 'user',
1687+ 'stored': {
1688+ stores[0]: {'copies': 1},
1689+ },
1690+ }
1691+ for _id in ranks[2]
1692+ )
1693+ docs.extend(
1694+ {
1695+ '_id': _id,
1696+ 'type': 'dmedia/file',
1697+ 'origin': 'user',
1698+ 'stored': {
1699+ stores[0]: {'copies': 1},
1700+ stores[1]: {'copies': 0},
1701+ },
1702+ }
1703+ for _id in ranks[3]
1704+ )
1705+ docs.extend(
1706+ {
1707+ '_id': _id,
1708+ 'type': 'dmedia/file',
1709+ 'origin': 'user',
1710+ 'stored': {
1711+ stores[0]: {'copies': 1},
1712+ stores[1]: {'copies': 1},
1713+ },
1714+ }
1715+ for _id in ranks[4]
1716+ )
1717+ docs.extend(
1718+ {
1719+ '_id': _id,
1720+ 'type': 'dmedia/file',
1721+ 'origin': 'user',
1722+ 'stored': {
1723+ stores[0]: {'copies': 1},
1724+ stores[1]: {'copies': 1},
1725+ stores[2]: {'copies': 0},
1726+ },
1727+ }
1728+ for _id in ranks[5]
1729+ )
1730+ docs.extend(
1731+ {
1732+ '_id': _id,
1733+ 'type': 'dmedia/file',
1734+ 'origin': 'user',
1735+ 'stored': {
1736+ stores[0]: {'copies': 1},
1737+ stores[1]: {'copies': 1},
1738+ stores[2]: {'copies': 1},
1739+ },
1740+ }
1741+ for _id in ranks[6]
1742+ )
1743+ self.assertEqual(len(docs), 140)
1744+ db.save_many(docs)
1745+
1746+ # Test that sorting is being done by (key, _id):
1747+ self.assertEqual(
1748+ db.view('file', 'rank'),
1749+ {
1750+ 'offset': 0,
1751+ 'total_rows': 140,
1752+ 'rows': [
1753+ {'key': n, 'id': _id, 'value': None}
1754+ for (n, _id) in flattened
1755+ ],
1756+ },
1757+ )
1758+
1759+ # Test that sorting is being done by _id within a single key, then test
1760+ # that we can use "startkey_docid" as expected:
1761+ offset = 0
1762+ for (n, rank_n) in enumerate(sorted_ranks):
1763+ self.assertEqual(
1764+ db.view('file', 'rank', key=n),
1765+ {
1766+ 'offset': offset,
1767+ 'total_rows': 140,
1768+ 'rows': [
1769+ {'key': n, 'id': _id, 'value': None}
1770+ for _id in rank_n
1771+ ],
1772+ },
1773+ )
1774+ for i in range(len(rank_n)):
1775+ self.assertEqual(
1776+ db.view('file', 'rank', key=n, startkey_docid=rank_n[i]),
1777+ {
1778+ 'offset': offset + i,
1779+ 'total_rows': 140,
1780+ 'rows': [
1781+ {'key': n, 'id': _id, 'value': None}
1782+ for _id in rank_n[i:]
1783+ ],
1784+ },
1785+ )
1786+ offset += len(rank_n)
1787+
1788 def test_fragile(self):
1789 db = Database('foo', self.env)
1790 db.put(None)
1791
1792=== modified file 'dmedia/views.py'
1793--- dmedia/views.py 2013-10-12 07:19:16 +0000
1794+++ dmedia/views.py 2013-11-21 03:22:50 +0000
1795@@ -110,6 +110,10 @@
1796 file_copies = """
1797 function(doc) {
1798 if (doc.type == 'dmedia/file' && doc.origin == 'user') {
1799+ if (typeof doc.stored != 'object' || isArray(doc.stored)) {
1800+ emit(0, null);
1801+ return;
1802+ }
1803 var total = 0;
1804 var key, copies;
1805 for (key in doc.stored) {
1806@@ -146,6 +150,27 @@
1807 }
1808 """
1809
1810+file_preempt = """
1811+function(doc) {
1812+ if (doc.type == 'dmedia/file' && doc.origin == 'user') {
1813+ if (typeof doc.stored != 'object' || isArray(doc.stored)) {
1814+ return;
1815+ }
1816+ var total = 0;
1817+ var key, copies;
1818+ for (key in doc.stored) {
1819+ copies = doc.stored[key].copies;
1820+ if (typeof copies == 'number' && copies > 0) {
1821+ total += copies;
1822+ }
1823+ }
1824+ if (total == 3) {
1825+ emit(doc.atime, null);
1826+ }
1827+ }
1828+}
1829+"""
1830+
1831 file_fragile = """
1832 function(doc) {
1833 if (doc.type == 'dmedia/file' && doc.origin == 'user') {
1834@@ -302,7 +327,8 @@
1835 'stored': {'map': file_stored, 'reduce': _stats},
1836 'nonzero': {'map': file_nonzero},
1837 'copies': {'map': file_copies},
1838- 'rank': {'map': file_rank},
1839+ 'rank': {'map': file_rank, 'reduce': _count},
1840+ 'preempt': {'map': file_preempt},
1841 'fragile': {'map': file_fragile},
1842 'downgrade-by-mtime': {'map': file_downgrade_by_mtime},
1843 'downgrade-by-verified': {'map': file_downgrade_by_verified},
1844@@ -332,6 +358,16 @@
1845 }
1846 """
1847
1848+store_bytes_avail = """
1849+function(doc) {
1850+ if (doc.type == 'dmedia/store') {
1851+ if (typeof doc.bytes_avail == 'number') {
1852+ emit(doc.bytes_avail, null);
1853+ }
1854+ }
1855+}
1856+"""
1857+
1858 store_drive_serial = """
1859 function(doc) {
1860 if (doc.type == 'dmedia/store') {
1861@@ -344,6 +380,7 @@
1862 '_id': '_design/store',
1863 'views': {
1864 'atime': {'map': store_atime},
1865+ 'bytes_avail': {'map': store_bytes_avail},
1866 'drive_serial': {'map': store_drive_serial},
1867 },
1868 }

Subscribers

People subscribed via source and target branches