Merge lp:~exarkun/divmod.org/pop3-grabber-deletes into lp:divmod.org
- pop3-grabber-deletes
- Merge into trunk
Status: | Work in progress |
---|---|
Proposed branch: | lp:~exarkun/divmod.org/pop3-grabber-deletes |
Merge into: | lp:divmod.org |
Diff against target: |
909 lines (+611/-85) 4 files modified
Quotient/xquotient/grabber.py (+166/-51) Quotient/xquotient/test/historic/stub_pop3uid1to2.py (+37/-0) Quotient/xquotient/test/historic/test_pop3uid1to2.py (+32/-0) Quotient/xquotient/test/test_grabber.py (+376/-34) |
To merge this branch: | bzr merge lp:~exarkun/divmod.org/pop3-grabber-deletes |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Divmod-dev | Pending | ||
Review via email:
|
Commit message
Description of the change
This implements deletion of messages from POP3 servers at least one week after Quotient grabs them.
- 2713. By Jean-Paul Calderone
-
Limit the number of results from any single call to shouldDelete to around 1000 - avoids tripping over a SQLite3 limitation, and limiting the number of results is better for large mailboxes anyway.
Unmerged revisions
- 2713. By Jean-Paul Calderone
-
Limit the number of results from any single call to shouldDelete to around 1000 - avoids tripping over a SQLite3 limitation, and limiting the number of results is better for large mailboxes anyway.
- 2712. By Jean-Paul Calderone
-
Index on boolean column of questionable value; removing it causes no test failures.
- 2711. By Jean-Paul Calderone
-
Query complexity tests for shouldDelete, and a compound index to make them pass.
- 2710. By Jean-Paul Calderone
-
Avoid grabbing before POP3UID upgrade is complete
- 2709. By Jean-Paul Calderone
-
Upgrader for old POP3UID items.
- 2708. By Jean-Paul Calderone
-
Fix markDeleted to disregard unrelated grabber state
- 2707. By Jean-Paul Calderone
-
Fix shouldDelete to only pay attention to our own POP3UIDs
- 2706. By Jean-Paul Calderone
-
Handle timeouts/lost connections during DELE command
- 2705. By Jean-Paul Calderone
-
Delete the POP3UID objects once the messages are deleted from the server.
- 2704. By Jean-Paul Calderone
-
Kind of gross test for the protocol integration, and the simple implementation change to make it pass
Preview Diff
1 | === modified file 'Quotient/xquotient/grabber.py' |
2 | --- Quotient/xquotient/grabber.py 2012-05-11 14:05:29 +0000 |
3 | +++ Quotient/xquotient/grabber.py 2013-01-02 01:49:22 +0000 |
4 | @@ -3,7 +3,7 @@ |
5 | from epsilon import hotfix |
6 | hotfix.require('twisted', 'deferredgenerator_tfailure') |
7 | |
8 | -import time, datetime |
9 | +import time, datetime, functools |
10 | |
11 | from twisted.mail import pop3, pop3client |
12 | from twisted.internet import protocol, defer, ssl, error |
13 | @@ -161,6 +161,12 @@ |
14 | |
15 | |
16 | class POP3UID(item.Item): |
17 | + schemaVersion = 2 |
18 | + |
19 | + retrieved = attributes.timestamp(doc=""" |
20 | + When this POP3 UID was retrieved (or when retrieval failed). |
21 | + """, allowNone=False) |
22 | + |
23 | grabberID = attributes.text(doc=""" |
24 | A string identifying the email-address/port parts of a |
25 | configured grabber |
26 | @@ -173,14 +179,33 @@ |
27 | failed = attributes.boolean(doc=""" |
28 | When set, indicates that an attempt was made to retrieve this UID, |
29 | but for some reason was unsuccessful. |
30 | - """, indexed=True, default=False) |
31 | - |
32 | + """, default=False) |
33 | + |
34 | + attributes.compoundIndex(grabberID, retrieved) |
35 | + |
36 | + |
37 | +def _pop3uid1to2(old): |
38 | + return old.upgradeVersion( |
39 | + POP3UID.typeName, 1, 2, |
40 | + value=old.value, failed=old.failed, grabberID=old.grabberID, |
41 | + retrieved=extime.Time()) |
42 | +registerUpgrader(_pop3uid1to2, POP3UID.typeName, 1, 2) |
43 | + |
44 | +POP3UIDv1 = item.declareLegacyItem(POP3UID.typeName, 1, dict( |
45 | + grabberID=attributes.text(indexed=True), |
46 | + value=attributes.bytes(indexed=True), |
47 | + failed=attributes.boolean(indexed=True, default=False))) |
48 | |
49 | |
50 | class POP3Grabber(item.Item): |
51 | """ |
52 | Item for retrieving email messages from a remote POP server. |
53 | """ |
54 | + DELETE_DELAY = datetime.timedelta(days=7) |
55 | + |
56 | + now = attributes.inmemory(doc=""" |
57 | + A callable returning a Time instance representing the current time. |
58 | + """) |
59 | |
60 | config = attributes.reference(doc=""" |
61 | The L{GrabberConfiguration} which created this grabber. |
62 | @@ -271,6 +296,7 @@ |
63 | self._pop3uids = None |
64 | self.running = False |
65 | self.protocol = None |
66 | + self.now = extime.Time |
67 | if self.status is None: |
68 | self.status = Status(store=self.store, message=u'idle') |
69 | |
70 | @@ -288,6 +314,14 @@ |
71 | # Don't run concurrently, ever. |
72 | if self.running: |
73 | return |
74 | + |
75 | + # Don't run while POP3UIDs are being upgraded. Any that have not yet |
76 | + # been upgraded won't be returned from query(POP3UID) calls, which will |
77 | + # confuse the logic about which messages to download. Eventually |
78 | + # they'll all be upgraded and we'll resume grabbing. |
79 | + if self.store.query(POP3UIDv1).count(): |
80 | + return |
81 | + |
82 | self.running = True |
83 | |
84 | from twisted.internet import reactor |
85 | @@ -340,10 +374,35 @@ |
86 | grabberID = property(_grabberID) |
87 | |
88 | |
89 | - def shouldRetrieve(self, uidList): |
90 | - """ |
91 | - Return a list of (index, uid) pairs from C{uidList} which have not |
92 | - already been grabbed. |
93 | + def shouldDelete(self, uidList): |
94 | + """ |
95 | + Return a list of (index, uid) pairs from C{uidList} which were |
96 | + downloaded long enough ago that they can be deleted now. |
97 | + """ |
98 | + # Find at most 996 of them. Combined with the other query variables in |
99 | + # the statement below, this reaches the SQLite3 query variable limit. |
100 | + # Any additional will be picked up in the future. |
101 | + uidList = uidList[:996] |
102 | + |
103 | + # And further limit them to POP3UIDs which were retrieved at least |
104 | + # DELETE_DELAY ago. Failed attempts do not count. |
105 | + where = attributes.AND( |
106 | + POP3UID.grabberID == self.grabberID, |
107 | + POP3UID.retrieved < self.now() - self.DELETE_DELAY, |
108 | + POP3UID.failed == False, |
109 | + POP3UID.value.oneOf([pair[1] for pair in uidList])) |
110 | + |
111 | + # Here are the server-side POP3 UIDs which we have downloaded and which |
112 | + # are old enough, so we should delete them. |
113 | + pop3uids = set(self.store.query(POP3UID, where).getColumn("value")) |
114 | + |
115 | + return [pair for pair in uidList if pair[1] in pop3uids] |
116 | + |
117 | + |
118 | + def _getPOP3UIDs(self): |
119 | + """ |
120 | + Return all the L{POP3UID} instances created by this grabber which still |
121 | + exist, perhaps from an in-memory cache. |
122 | """ |
123 | if self._pop3uids is None: |
124 | before = time.time() |
125 | @@ -352,8 +411,17 @@ |
126 | self._pop3uids = set(self.store.query(POP3UID, POP3UID.grabberID == self.grabberID).getColumn("value")) |
127 | after = time.time() |
128 | log.msg(interface=iaxiom.IStatEvent, stat_pop3uid_load_time=after - before) |
129 | + return self._pop3uids |
130 | + |
131 | + |
132 | + def shouldRetrieve(self, uidList): |
133 | + """ |
134 | + Return a list of (index, uid) pairs from C{uidList} which have not |
135 | + already been grabbed. |
136 | + """ |
137 | + pop3uids = self._getPOP3UIDs() |
138 | log.msg(interface=iaxiom.IStatEvent, stat_pop3uid_check=len(uidList)) |
139 | - return [pair for pair in uidList if pair[1] not in self._pop3uids] |
140 | + return [pair for pair in uidList if pair[1] not in pop3uids] |
141 | |
142 | |
143 | def markSuccess(self, uid, msg): |
144 | @@ -378,17 +446,33 @@ |
145 | msg.archive() |
146 | log.msg(interface=iaxiom.IStatEvent, stat_messages_grabbed=1, |
147 | userstore=self.store) |
148 | - POP3UID(store=self.store, grabberID=self.grabberID, value=uid) |
149 | + POP3UID( |
150 | + store=self.store, |
151 | + grabberID=self.grabberID, |
152 | + value=uid, |
153 | + retrieved=self.now()) |
154 | if self._pop3uids is not None: |
155 | self._pop3uids.add(uid) |
156 | |
157 | |
158 | def markFailure(self, uid, err): |
159 | - POP3UID(store=self.store, grabberID=self.grabberID, value=uid, failed=True) |
160 | + POP3UID( |
161 | + store=self.store, |
162 | + grabberID=self.grabberID, |
163 | + value=uid, |
164 | + retrieved=self.now(), |
165 | + failed=True) |
166 | if self._pop3uids is not None: |
167 | self._pop3uids.add(uid) |
168 | |
169 | |
170 | + def markDeleted(self, uid): |
171 | + where = attributes.AND( |
172 | + POP3UID.value == uid, POP3UID.grabberID == self.grabberID) |
173 | + query = self.store.query(POP3UID, where) |
174 | + query.deleteFromStore() |
175 | + |
176 | + |
177 | |
178 | class POP3GrabberProtocol(pop3.AdvancedPOP3Client): |
179 | _rate = 50 |
180 | @@ -479,6 +563,9 @@ |
181 | # All the (index, uid) pairs which should be retrieved |
182 | uidList = [] |
183 | |
184 | + # All of the (index, uid) pairs which should be deleted |
185 | + uidDeleteList = [] |
186 | + |
187 | # Consumer for listUID - adds to the working set and processes |
188 | # a batch if appropriate. |
189 | def consumeUIDLine(ent): |
190 | @@ -487,9 +574,14 @@ |
191 | processBatch() |
192 | |
193 | def processBatch(): |
194 | - L = self.shouldRetrieve(uidWorkingSet) |
195 | - L.sort() |
196 | - uidList.extend(L) |
197 | + toRetrieve = self.shouldRetrieve(uidWorkingSet) |
198 | + toRetrieve.sort() |
199 | + uidList.extend(toRetrieve) |
200 | + |
201 | + toDelete = self.shouldDelete(uidWorkingSet) |
202 | + toDelete.sort() |
203 | + uidDeleteList.extend(toDelete) |
204 | + |
205 | del uidWorkingSet[:] |
206 | |
207 | |
208 | @@ -555,6 +647,17 @@ |
209 | else: |
210 | self.markSuccess(uid, rece.message) |
211 | |
212 | + # Delete any old messages that should now be deleted |
213 | + for (index, uid) in uidDeleteList: |
214 | + d = defer.waitForDeferred(self.delete(index)) |
215 | + yield d |
216 | + try: |
217 | + d.getResult() |
218 | + except (error.ConnectionDone, error.ConnectionLost): |
219 | + return |
220 | + |
221 | + self.markDeleted(uid) |
222 | + |
223 | self.setStatus(u"Logging out...") |
224 | d = defer.waitForDeferred(self.quit()) |
225 | yield d |
226 | @@ -584,56 +687,68 @@ |
227 | |
228 | |
229 | |
230 | +def _requiresGrabberItem(f): |
231 | + """ |
232 | + Decorator for a method on ControlledPOP3GrabberProtocol which makes it safe |
233 | + to call even after the connection has been lost. |
234 | + """ |
235 | + @functools.wraps(f) |
236 | + def safe(self, *args, **kwargs): |
237 | + if self.grabber is not None: |
238 | + return self.grabber.store.transact(f, self, *args, **kwargs) |
239 | + return safe |
240 | + |
241 | + |
242 | + |
243 | class ControlledPOP3GrabberProtocol(POP3GrabberProtocol): |
244 | - def _transact(self, *a, **kw): |
245 | - return self.grabber.store.transact(*a, **kw) |
246 | - |
247 | - |
248 | + _transient = False |
249 | + def transientFailure(self, f): |
250 | + self._transient = True |
251 | + |
252 | + |
253 | + @_requiresGrabberItem |
254 | def getSource(self): |
255 | return u'pop3://' + self.grabber.grabberID |
256 | |
257 | |
258 | + @_requiresGrabberItem |
259 | def setStatus(self, msg, success=True): |
260 | - if self.grabber is not None: |
261 | - self._transact(self.grabber.status.setStatus, msg, success) |
262 | - |
263 | - |
264 | + return self.grabber.status.setStatus(msg, success) |
265 | + |
266 | + |
267 | + @_requiresGrabberItem |
268 | def shouldRetrieve(self, uidList): |
269 | - if self.grabber is not None: |
270 | - return self._transact(self.grabber.shouldRetrieve, uidList) |
271 | - |
272 | - |
273 | + return self.grabber.shouldRetrieve(uidList) |
274 | + |
275 | + |
276 | + @_requiresGrabberItem |
277 | + def shouldDelete(self, uidList): |
278 | + return self.grabber.shouldDelete(uidList) |
279 | + |
280 | + |
281 | + @_requiresGrabberItem |
282 | def createMIMEReceiver(self, source): |
283 | - if self.grabber is not None: |
284 | - def createIt(): |
285 | - agent = self.grabber.config.deliveryAgent |
286 | - return agent.createMIMEReceiver(source) |
287 | - return self._transact(createIt) |
288 | - |
289 | - |
290 | + agent = self.grabber.config.deliveryAgent |
291 | + return agent.createMIMEReceiver(source) |
292 | + |
293 | + |
294 | + @_requiresGrabberItem |
295 | def markSuccess(self, uid, msg): |
296 | - if self.grabber is not None: |
297 | - return self._transact(self.grabber.markSuccess, uid, msg) |
298 | - |
299 | - |
300 | + return self.grabber.markSuccess(uid, msg) |
301 | + |
302 | + |
303 | + @_requiresGrabberItem |
304 | def markFailure(self, uid, reason): |
305 | - if self.grabber is not None: |
306 | - return self._transact(self.grabber.markFailure, uid, reason) |
307 | - |
308 | - |
309 | + return self.grabber.markFailure(uid, reason) |
310 | + |
311 | + |
312 | + @_requiresGrabberItem |
313 | def paused(self): |
314 | - if self.grabber is not None: |
315 | - return self.grabber.paused |
316 | - |
317 | - |
318 | - _transient = False |
319 | - def transientFailure(self, f): |
320 | - self._transient = True |
321 | - |
322 | - |
323 | + return self.grabber.paused |
324 | + |
325 | + |
326 | + @_requiresGrabberItem |
327 | def stoppedRunning(self): |
328 | - if self.grabber is None: |
329 | - return |
330 | self.grabber.running = False |
331 | if self._transient: |
332 | iaxiom.IScheduler(self.grabber.store).reschedule( |
333 | |
334 | === added file 'Quotient/xquotient/test/historic/pop3uid1to2.axiom.tbz2' |
335 | Binary files Quotient/xquotient/test/historic/pop3uid1to2.axiom.tbz2 1970-01-01 00:00:00 +0000 and Quotient/xquotient/test/historic/pop3uid1to2.axiom.tbz2 2013-01-02 01:49:22 +0000 differ |
336 | === added file 'Quotient/xquotient/test/historic/stub_pop3uid1to2.py' |
337 | --- Quotient/xquotient/test/historic/stub_pop3uid1to2.py 1970-01-01 00:00:00 +0000 |
338 | +++ Quotient/xquotient/test/historic/stub_pop3uid1to2.py 2013-01-02 01:49:22 +0000 |
339 | @@ -0,0 +1,37 @@ |
340 | +# -*- test-case-name: xquotient.test.historic.test_pop3uid1to2 -*- |
341 | + |
342 | +""" |
343 | +Create stub database for upgrade of L{xquotient.grabber.POP3UID} from version 1 |
344 | +to version 2. |
345 | +""" |
346 | + |
347 | +from axiom.test.historic.stubloader import saveStub |
348 | + |
349 | +from axiom.userbase import LoginSystem |
350 | +from axiom.dependency import installOn |
351 | + |
352 | +from xquotient.grabber import POP3UID |
353 | + |
354 | +VALUE = b"12345678abcdefgh" |
355 | +FAILED = False |
356 | +GRABBER_ID = u"alice@example.com:1234" |
357 | + |
358 | +def createDatabase(s): |
359 | + """ |
360 | + Create an account in the given store and create a POP3UID item in it. |
361 | + """ |
362 | + loginSystem = LoginSystem(store=s) |
363 | + installOn(loginSystem, s) |
364 | + |
365 | + account = loginSystem.addAccount(u'testuser', u'localhost', None) |
366 | + subStore = account.avatars.open() |
367 | + |
368 | + POP3UID( |
369 | + store=subStore, |
370 | + value=VALUE, |
371 | + failed=FAILED, |
372 | + grabberID=GRABBER_ID) |
373 | + |
374 | + |
375 | +if __name__ == '__main__': |
376 | + saveStub(createDatabase, 'exarkun@twistedmatrix.com-20120913121256-tg7d6l1w3rkpfehr') |
377 | |
378 | === added file 'Quotient/xquotient/test/historic/test_pop3uid1to2.py' |
379 | --- Quotient/xquotient/test/historic/test_pop3uid1to2.py 1970-01-01 00:00:00 +0000 |
380 | +++ Quotient/xquotient/test/historic/test_pop3uid1to2.py 2013-01-02 01:49:22 +0000 |
381 | @@ -0,0 +1,32 @@ |
382 | + |
383 | +""" |
384 | +Test that a version 1 POP3UID is unchanged by the upgrade except that it gains a |
385 | +value for the new C{retrieved} attribute set to something near the current time. |
386 | +""" |
387 | + |
388 | +from epsilon.extime import Time |
389 | + |
390 | +from axiom.userbase import LoginSystem |
391 | +from axiom.test.historic.stubloader import StubbedTest |
392 | + |
393 | +from xquotient.test.historic.stub_pop3uid1to2 import VALUE, FAILED, GRABBER_ID |
394 | +from xquotient.grabber import POP3UID |
395 | + |
396 | +class POP3UIDUpgradeTestCase(StubbedTest): |
397 | + def test_attributes(self): |
398 | + loginSystem = self.store.findUnique(LoginSystem) |
399 | + account = loginSystem.accountByAddress(u'testuser', u'localhost') |
400 | + subStore = account.avatars.open() |
401 | + |
402 | + d = subStore.whenFullyUpgraded() |
403 | + def upgraded(ignored): |
404 | + [pop3uid] = list(subStore.query(POP3UID)) |
405 | + self.assertEqual(VALUE, pop3uid.value) |
406 | + self.assertEqual(FAILED, pop3uid.failed) |
407 | + self.assertEqual(GRABBER_ID, pop3uid.grabberID) |
408 | + |
409 | + # This will be close enough. |
410 | + elapsed = (Time() - pop3uid.retrieved).total_seconds() |
411 | + self.assertTrue(abs(elapsed) < 60) |
412 | + d.addCallback(upgraded) |
413 | + return d |
414 | |
415 | === modified file 'Quotient/xquotient/test/test_grabber.py' |
416 | --- Quotient/xquotient/test/test_grabber.py 2012-05-11 14:05:29 +0000 |
417 | +++ Quotient/xquotient/test/test_grabber.py 2013-01-02 01:49:22 +0000 |
418 | @@ -3,8 +3,11 @@ |
419 | |
420 | from datetime import timedelta |
421 | |
422 | +from zope.interface import directlyProvides |
423 | + |
424 | from twisted.trial import unittest |
425 | from twisted.internet import defer, error |
426 | +from twisted.internet.interfaces import ISSLTransport |
427 | from twisted.mail import pop3 |
428 | from twisted.cred import error as ecred |
429 | from twisted.test.proto_helpers import StringTransport |
430 | @@ -15,6 +18,7 @@ |
431 | from epsilon.test import iosim |
432 | |
433 | from axiom import iaxiom, store, substore, scheduler |
434 | +from axiom.test.util import QueryCounter |
435 | |
436 | from xquotient import grabber, mimepart |
437 | |
438 | @@ -50,6 +54,8 @@ |
439 | def connectionMade(self): |
440 | grabber.POP3GrabberProtocol.connectionMade(self) |
441 | self.events = [] |
442 | + self.uidsForDeletion = set() |
443 | + self.uidsNotForRetrieval = set() |
444 | |
445 | |
446 | def getSource(self): |
447 | @@ -62,8 +68,13 @@ |
448 | |
449 | |
450 | def shouldRetrieve(self, uidList): |
451 | - self.events.append(('retrieve', uidList)) |
452 | - return list(uidList) |
453 | + self.events.append(('retrieve', list(uidList))) |
454 | + return [pair for pair in uidList if pair[1] not in self.uidsNotForRetrieval] |
455 | + |
456 | + |
457 | + def shouldDelete(self, uidList): |
458 | + self.events.append(('delete', list(uidList))) |
459 | + return [pair for pair in uidList if pair[1] in self.uidsForDeletion] |
460 | |
461 | |
462 | def createMIMEReceiver(self, source): |
463 | @@ -80,6 +91,10 @@ |
464 | self.events.append(('failure', uid, reason)) |
465 | |
466 | |
467 | + def markDeleted(self, uid): |
468 | + self.events.append(('markDeleted', uid)) |
469 | + |
470 | + |
471 | def paused(self): |
472 | self.events.append(('paused',)) |
473 | return False |
474 | @@ -229,6 +244,52 @@ |
475 | 'stopped') |
476 | |
477 | |
478 | + def test_deletion(self): |
479 | + """ |
480 | + Messages indicated by C{shouldDelete} to be ready for deleted are |
481 | + deleted using the I{DELE} POP3 protocol action. |
482 | + """ |
483 | + transport = StringTransport() |
484 | + # Convince the client to log in |
485 | + directlyProvides(transport, ISSLTransport) |
486 | + |
487 | + self.client.makeConnection(transport) |
488 | + self.addCleanup(self.client.connectionLost, error.ConnectionLost("Simulated")) |
489 | + |
490 | + self.client.uidsForDeletion.add(b'xyz') |
491 | + self.client.uidsNotForRetrieval.add(b'abc') |
492 | + self.client.uidsNotForRetrieval.add(b'xyz') |
493 | + |
494 | + # Server greeting |
495 | + self.client.dataReceived("+OK Hello\r\n") |
496 | + # CAPA response |
497 | + self.client.dataReceived("+OK\r\nUSER\r\nUIDL\r\n.\r\n") |
498 | + # USER response |
499 | + self.client.dataReceived("+OK\r\n") |
500 | + # PASS response |
501 | + self.client.dataReceived("+OK\r\n") |
502 | + |
503 | + del self.client.events[:] |
504 | + transport.clear() |
505 | + |
506 | + # UIDL response |
507 | + self.client.dataReceived('+OK \r\n1 abc\r\n3 xyz\r\n.\r\n') |
508 | + |
509 | + # Protocol should consult shouldDelete with the UIDs and start issuing |
510 | + # delete commands. |
511 | + self.assertEquals( |
512 | + [('delete', [(0, 'abc'), (2, 'xyz')])], |
513 | + [event for event in self.client.events if event[0] == 'delete']) |
514 | + self.assertEqual("DELE 3\r\n", transport.value()) |
515 | + |
516 | + del self.client.events[:] |
517 | + |
518 | + # DELE response |
519 | + self.client.dataReceived("+OK\r\n") |
520 | + |
521 | + self.assertEquals(('markDeleted', 'xyz'), self.client.events[0]) |
522 | + |
523 | + |
524 | def testLineTooLong(self): |
525 | """ |
526 | Make sure a message illegally served with a line longer than we will |
527 | @@ -400,21 +461,38 @@ |
528 | self.assertTrue(scheduled[0] <= extime.Time()) |
529 | |
530 | |
531 | + def _timeoutTest(self, exchange): |
532 | + """ |
533 | + Exercise handling of a connection timeout at some phase of the |
534 | + interaction. |
535 | + """ |
536 | + transport = StringTransport() |
537 | + factory = grabber.POP3GrabberFactory(self.grabberItem, False) |
538 | + protocol = factory.buildProtocol(None) |
539 | + protocol.allowInsecureLogin = True |
540 | + protocol.makeConnection(transport) |
541 | + |
542 | + for (serverMessage, clientMessage) in exchange: |
543 | + protocol.dataReceived(serverMessage) |
544 | + self.assertEqual(clientMessage, transport.value()) |
545 | + transport.clear() |
546 | + |
547 | + protocol.timeoutConnection() |
548 | + self.assertTrue(transport.disconnecting) |
549 | + protocol.connectionLost(Failure(error.ConnectionLost("Simulated"))) |
550 | + |
551 | + self.assertEqual( |
552 | + self.grabberItem.status.message, |
553 | + u"Timed out waiting for server response.") |
554 | + |
555 | + |
556 | def test_stoppedRunningAfterTimeout(self): |
557 | """ |
558 | When L{ControlledPOP3GrabberProtocol} times out the connection |
559 | due to inactivity, the controlling grabber's status is set to |
560 | reflect this. |
561 | """ |
562 | - factory = grabber.POP3GrabberFactory(self.grabberItem, False) |
563 | - protocol = factory.buildProtocol(None) |
564 | - protocol.makeConnection(StringTransport()) |
565 | - protocol.timeoutConnection() |
566 | - protocol.connectionLost(Failure(error.ConnectionLost("Simulated"))) |
567 | - |
568 | - self.assertEqual( |
569 | - self.grabberItem.status.message, |
570 | - u"Timed out waiting for server response.") |
571 | + self._timeoutTest([]) |
572 | |
573 | |
574 | def test_stoppedRunningAfterListTimeout(self): |
575 | @@ -424,28 +502,53 @@ |
576 | (list UIDs) command, the controlling grabber's status is set |
577 | to reflect this. |
578 | """ |
579 | - factory = grabber.POP3GrabberFactory(self.grabberItem, False) |
580 | - protocol = factory.buildProtocol(None) |
581 | - protocol.allowInsecureLogin = True |
582 | - protocol.makeConnection(StringTransport()) |
583 | - # Server greeting |
584 | - protocol.dataReceived("+OK Hello\r\n") |
585 | - # CAPA response |
586 | - protocol.dataReceived("+OK\r\nUSER\r\nUIDL\r\n.\r\n") |
587 | - # USER response |
588 | - protocol.dataReceived("+OK\r\n") |
589 | - # PASS response |
590 | - protocol.dataReceived("+OK\r\n") |
591 | - # Sanity check, we should have gotten to sending the UIDL |
592 | - self.assertTrue( |
593 | - protocol.transport.value().endswith("\r\nUIDL\r\n"), |
594 | - "Failed to get to UIDL: %r" % (protocol.transport.value(),)) |
595 | - |
596 | - protocol.timeoutConnection() |
597 | - protocol.connectionLost(Failure(error.ConnectionLost("Simulated"))) |
598 | - self.assertEqual( |
599 | - self.grabberItem.status.message, |
600 | - u"Timed out waiting for server response.") |
601 | + self._timeoutTest([ |
602 | + # Server greeting |
603 | + (b"+OK Hello\r\n", b"CAPA\r\n"), |
604 | + # CAPA response |
605 | + (b"+OK\r\nUSER\r\nUIDL\r\n.\r\n", b"USER alice\r\n"), |
606 | + # USER response |
607 | + (b"+OK\r\n", b"PASS secret\r\n"), |
608 | + # PASS response |
609 | + (b"+OK\r\n", b"UIDL\r\n")]) |
610 | + |
611 | + |
612 | + def test_stoppedRunningAfterDeleteTimeout(self): |
613 | + # Set up some good state to want to delete |
614 | + uid = b'abc' |
615 | + delay = self.grabberItem.DELETE_DELAY |
616 | + future = extime.Time() |
617 | + now = future - delay - timedelta(seconds=1) |
618 | + self.grabberItem.now = lambda: now |
619 | + self.grabberItem.markSuccess(uid, StubMessage()) |
620 | + now = future |
621 | + |
622 | + self._timeoutTest([ |
623 | + # Server greeting |
624 | + (b"+OK Hello\r\n", b"CAPA\r\n"), |
625 | + # CAPA response |
626 | + (b"+OK\r\nUSER\r\nUIDL\r\n.\r\n", b"USER alice\r\n"), |
627 | + # USER response |
628 | + (b"+OK\r\n", b"PASS secret\r\n"), |
629 | + # PASS response |
630 | + (b"+OK\r\n", b"UIDL\r\n"), |
631 | + # UIDL response |
632 | + (b"+OK\r\n1 abc\r\n.\r\n", b"DELE 1\r\n")]) |
633 | + |
634 | + |
635 | + def test_notGrabWhileUpgrading(self): |
636 | + """ |
637 | + As long as any old (schemaVersion less than most recent) L{POP3UID} |
638 | + items remain in the database, L{POP3Grabber.grab} does not try to grab |
639 | + any messages. |
640 | + """ |
641 | + grabber.POP3UIDv1( |
642 | + store=self.userStore, |
643 | + grabberID=self.grabberItem.grabberID, |
644 | + failed=False, |
645 | + value=b'xyz') |
646 | + self.grabberItem.grab() |
647 | + self.assertFalse(self.grabberItem.running) |
648 | |
649 | |
650 | |
651 | @@ -490,7 +593,8 @@ |
652 | for i in xrange(100, 200): |
653 | grabber.POP3UID(store=self.store, |
654 | grabberID=self.grabber.grabberID, |
655 | - value=str(i)) |
656 | + value=str(i), |
657 | + retrieved=extime.Time()) |
658 | |
659 | |
660 | def testShouldRetrieve(self): |
661 | @@ -541,6 +645,34 @@ |
662 | [(49, '49'), (51, '51')]) |
663 | |
664 | |
665 | + def test_successTimestamp(self): |
666 | + """ |
667 | + The L{POP3UID} instance created by L{POP3Grabber.markSuccess} has its |
668 | + C{retrieved} attribute set to the current time as reported by |
669 | + L{POP3Grabber.now}. |
670 | + """ |
671 | + now = extime.Time() |
672 | + self.grabber.now = lambda: now |
673 | + self.grabber.markSuccess(b'123abc', StubMessage()) |
674 | + [pop3uid] = list(self.store.query( |
675 | + grabber.POP3UID, grabber.POP3UID.value == b'123abc')) |
676 | + self.assertEqual(now, pop3uid.retrieved) |
677 | + |
678 | + |
679 | + def test_failureTimestamp(self): |
680 | + """ |
681 | + The L{POP3UID} instance created by L{POP3Grabber.markFailure} has its |
682 | + C{retrieved} attribute set to the current time as reported by |
683 | + L{POP3Grabber.now}. |
684 | + """ |
685 | + now = extime.Time() |
686 | + self.grabber.now = lambda: now |
687 | + self.grabber.markFailure(b'123abc', object()) |
688 | + [pop3uid] = list(self.store.query( |
689 | + grabber.POP3UID, grabber.POP3UID.value == b'123abc')) |
690 | + self.assertEqual(now, pop3uid.retrieved) |
691 | + |
692 | + |
693 | def test_delete(self): |
694 | """ |
695 | L{POP3Grabber.delete} unschedules the grabber. |
696 | @@ -553,3 +685,213 @@ |
697 | # was scheduled either. |
698 | self.assertEqual( |
699 | [], list(store.query(scheduler.TimedEvent))) |
700 | + |
701 | + |
702 | + def test_shouldDeleteOldMessage(self): |
703 | + """ |
704 | + C{shouldDelete} accepts a list of (index, uid) pairs and returns a list |
705 | + of (index, uid) pairs corresponding to messages which may now be deleted |
706 | + from the POP3 server (due to having been downloaded more than a fixed |
707 | + number of days in the past). |
708 | + """ |
709 | + epoch = extime.Time() |
710 | + now = epoch - (self.grabber.DELETE_DELAY + timedelta(days=1)) |
711 | + |
712 | + self.grabber.now = lambda: now |
713 | + |
714 | + # Generate some state representing a past success |
715 | + oldEnough = b'123abc' |
716 | + self.grabber.markSuccess(oldEnough, StubMessage()) |
717 | + |
718 | + # Wind the clock forward far enough so that oldEnough should be |
719 | + # considered old enough for deletion. |
720 | + now = epoch |
721 | + |
722 | + self.assertEqual( |
723 | + [(3, oldEnough)], self.grabber.shouldDelete([(3, oldEnough)])) |
724 | + |
725 | + |
726 | + def test_shouldDeleteOtherGrabberState(self): |
727 | + """ |
728 | + Messages downloaded by an unrelated grabber are not considered by |
729 | + C{shouldDelete}. |
730 | + """ |
731 | + uid = b'abcdef' |
732 | + then = extime.Time() - self.grabber.DELETE_DELAY - timedelta(days=1) |
733 | + grabber.POP3UID( |
734 | + store=self.store, grabberID=u'bob@example.org:default', value=uid, |
735 | + retrieved=then) |
736 | + |
737 | + self.assertEqual([], self.grabber.shouldDelete([(5, uid)])) |
738 | + |
739 | + |
740 | + |
741 | + def test_shouldDeleteNewMessage(self): |
742 | + """ |
743 | + Messages downloaded less than a fixed number of days in the past are not |
744 | + indicated as deletable by C{shouldDelete}. |
745 | + """ |
746 | + epoch = extime.Time() |
747 | + now = epoch - (self.grabber.DELETE_DELAY - timedelta(days=1)) |
748 | + |
749 | + self.grabber.now = lambda: now |
750 | + |
751 | + # Generate some state representing a *recently* past success |
752 | + newEnough = b'xyz123' |
753 | + self.grabber.markSuccess(newEnough, StubMessage()) |
754 | + |
755 | + # Wind the clock forward, but not so far forward that newEnough is |
756 | + # considered old enough for deletion. |
757 | + now = epoch |
758 | + |
759 | + self.assertEqual( |
760 | + [], self.grabber.shouldDelete([(5, newEnough)])) |
761 | + |
762 | + |
763 | + def test_shouldDeleteFailedMessage(self): |
764 | + """ |
765 | + Messages for which the download failed are not indicated as deletable by |
766 | + C{shouldDelete}. |
767 | + """ |
768 | + epoch = extime.Time() |
769 | + now = epoch - (self.grabber.DELETE_DELAY + timedelta(days=1)) |
770 | + |
771 | + self.grabber.now = lambda: now |
772 | + |
773 | + # Generate some state representing a past failure |
774 | + failed = b'xyz123' |
775 | + self.grabber.markFailure(failed, object()) |
776 | + |
777 | + # Wind the clock forward enough so that the failed message would be old |
778 | + # enough - if it had been a success. |
779 | + now = epoch |
780 | + |
781 | + self.assertEqual( |
782 | + [], self.grabber.shouldDelete([(7, failed)])) |
783 | + |
784 | + |
785 | + def test_shouldDeleteUnknownMessage(self): |
786 | + """ |
787 | + Messages which have not been downloaded are not indicated as deletable |
788 | + by C{shouldDelete}. |
789 | + """ |
790 | + self.assertEqual( |
791 | + [], self.grabber.shouldDelete([(7, b'9876wxyz')])) |
792 | + |
793 | + |
794 | + def test_shouldDeleteMessageLimit(self): |
795 | + """ |
796 | + At most around 1000 (the exact value of the limit will be imposed by |
797 | + SQLite3) messages are considered for deletion by C{shouldDelete}. |
798 | + """ |
799 | + epoch = extime.Time() |
800 | + now = epoch - (self.grabber.DELETE_DELAY + timedelta(days=1)) |
801 | + self.grabber.now = lambda: now |
802 | + |
803 | + uidList = [] |
804 | + for i in range(1100): |
805 | + uid = b'%dabc' % (i,) |
806 | + uidList.append((i, uid)) |
807 | + self.grabber.markSuccess(uid, StubMessage()) |
808 | + |
809 | + # Spin the clock forward so all those messages are considered deletable. |
810 | + now = epoch |
811 | + self.assertEqual( |
812 | + uidList[:996], self.grabber.shouldDelete(uidList)) |
813 | + |
814 | + |
815 | + def test_now(self): |
816 | + """ |
817 | + L{POP3Grabber.now} returns the current time. |
818 | + """ |
819 | + self.assertTrue(extime.Time() <= self.grabber.now()) |
820 | + self.assertTrue(self.grabber.now() <= extime.Time()) |
821 | + |
822 | + |
823 | + def test_markDeleted(self): |
824 | + """ |
825 | + L{POP3Grabber.markDeleted} deletes the L{POP3UID} corresponding to the |
826 | + message UID passed in. |
827 | + """ |
828 | + uid = b'abcdef' |
829 | + self.grabber.markSuccess(uid, StubMessage()) |
830 | + self.grabber.markDeleted(uid) |
831 | + persistentUIDs = list(self.store.query( |
832 | + grabber.POP3UID, grabber.POP3UID.value == uid)) |
833 | + self.assertEqual([], persistentUIDs) |
834 | + |
835 | + |
836 | + def test_markDeletedOtherGrabber(self): |
837 | + """ |
838 | + L{POP3Grabber.markDeleted} does not delete a L{POP3UID} with a matching |
839 | + message UID but which belongs to a different grabber. |
840 | + """ |
841 | + uid = b'abcdef' |
842 | + pop3uid = grabber.POP3UID( |
843 | + store=self.store, |
844 | + grabberID=u'bob@example.org:default', |
845 | + value=uid, |
846 | + retrieved=extime.Time()) |
847 | + self.grabber.markDeleted(uid) |
848 | + persistentUIDs = list(self.store.query( |
849 | + grabber.POP3UID, grabber.POP3UID.value == uid)) |
850 | + self.assertEqual([pop3uid], persistentUIDs) |
851 | + |
852 | + |
853 | + |
854 | +class ShouldDeleteComplexityTests(unittest.TestCase): |
855 | + """ |
856 | + Tests for the query complexity of L{POP3Grabber.shouldDelete}. |
857 | + """ |
858 | + def test_otherGrabber(self): |
859 | + """ |
860 | + The database complexity of L{POP3Grabber.shouldDelete} is independent of |
861 | + the number of L{POP3UID} items which belong to another L{POP3Grabber}. |
862 | + """ |
863 | + self._complexityTest( |
864 | + lambda grabberItem: grabber.POP3UID( |
865 | + store=grabberItem.store, retrieved=extime.Time(), failed=False, |
866 | + grabberID=grabberItem.grabberID + b'unrelated', value=b'123')) |
867 | + |
868 | + |
869 | + def test_shouldNotDelete(self): |
870 | + """ |
871 | + The database complexity of L{POP3Grabber.shouldDelete} is independent of |
872 | + the number of L{POP3UID} items which exist in the database but do not |
873 | + yet merit deletion. |
874 | + """ |
875 | + self._complexityTest( |
876 | + lambda grabberItem: grabber.POP3UID( |
877 | + store=grabberItem.store, retrieved=extime.Time(), failed=False, |
878 | + grabberID=grabberItem.grabberID, value=b'def')) |
879 | + |
880 | + |
881 | + def _complexityTest(self, makePOP3UID): |
882 | + s = store.Store() |
883 | + counter = QueryCounter(s) |
884 | + |
885 | + config = grabber.GrabberConfiguration(store=s) |
886 | + grabberItem = grabber.POP3Grabber( |
887 | + store=s, |
888 | + config=config, |
889 | + username=u"testuser", |
890 | + domain=u"example.com", |
891 | + password=u"password") |
892 | + |
893 | + # Create at least one POP3UID, since zero-items-in-table is always |
894 | + # different from any-items-in-table. |
895 | + for i in range(5): |
896 | + grabber.POP3UID( |
897 | + store=s, retrieved=extime.Time(), failed=False, |
898 | + grabberID=grabberItem.grabberID, value=b'abc' + str(i)) |
899 | + |
900 | + fewer = counter.measure( |
901 | + lambda: grabberItem.shouldDelete([b"123"])) |
902 | + |
903 | + # Create another non-matching POP3UID |
904 | + makePOP3UID(grabberItem) |
905 | + |
906 | + more = counter.measure( |
907 | + lambda: grabberItem.shouldDelete([b"123"])) |
908 | + |
909 | + self.assertEqual(fewer, more) |
Some things left to do:
- Tweak the code that decides the maximum number of messages to delete in a single session
- Synchronize the Axiom POP3UID deletion transaction(s) with the POP3 server's message TRANSACTION state. A lost connection (eg, from server shutdown) during deletion leaves the messages on the server but erases Quotient's notion of them, causing them to be re-downloaded.
- Reconsider the week long deletion moratorium. Perhaps a useful feature, but *I* don't actually need it. Maybe just set it much lower for now - perhaps to an hour. A week of mail is still over 3000 messages in my mailbox.