Merge lp:~exarkun/divmod.org/pop3-grabber-deletes into lp:divmod.org

Proposed by Jean-Paul Calderone
Status: Work in progress
Proposed branch: lp:~exarkun/divmod.org/pop3-grabber-deletes
Merge into: lp:divmod.org
Diff against target: 909 lines (+611/-85)
4 files modified
Quotient/xquotient/grabber.py (+166/-51)
Quotient/xquotient/test/historic/stub_pop3uid1to2.py (+37/-0)
Quotient/xquotient/test/historic/test_pop3uid1to2.py (+32/-0)
Quotient/xquotient/test/test_grabber.py (+376/-34)
To merge this branch: bzr merge lp:~exarkun/divmod.org/pop3-grabber-deletes
Reviewer Review Type Date Requested Status
Divmod-dev Pending
Review via email: mp+132756@code.launchpad.net

Description of the change

This implements deletion of messages from POP3 servers at least one week after Quotient grabs them.

To post a comment you must log in.
Revision history for this message
Jean-Paul Calderone (exarkun) wrote :

Some things left to do:

  - Tweak the code that decides the maximum number of messages to delete in a single session
  - Synchronize the Axiom POP3UID deletion transaction(s) with the POP3 server's message TRANSACTION state. A lost connection (eg, from server shutdown) during deletion leaves the messages on the server but erases Quotient's notion of them, causing them to be re-downloaded.
  - Reconsider the week long deletion moratorium. Perhaps a useful feature, but *I* don't actually need it. Maybe just set it much lower for now - perhaps to an hour. A week of mail is still over 3000 messages in my mailbox.

2713. By Jean-Paul Calderone

Limit the number of results from any single call to shouldDelete to around 1000 - avoids tripping over a SQLite3 limitation, and limiting the number of results is better for large mailboxes anyway.

Unmerged revisions

2713. By Jean-Paul Calderone

Limit the number of results from any single call to shouldDelete to around 1000 - avoids tripping over a SQLite3 limitation, and limiting the number of results is better for large mailboxes anyway.

2712. By Jean-Paul Calderone

Index on boolean column of questionable value; removing it causes no test failures.

2711. By Jean-Paul Calderone

Query complexity tests for shouldDelete, and a compound index to make them pass.

2710. By Jean-Paul Calderone

Avoid grabbing before POP3UID upgrade is complete

2709. By Jean-Paul Calderone

Upgrader for old POP3UID items.

2708. By Jean-Paul Calderone

Fix markDeleted to disregard unrelated grabber state

2707. By Jean-Paul Calderone

Fix shouldDelete to only pay attention to our own POP3UIDs

2706. By Jean-Paul Calderone

Handle timeouts/lost connections during DELE command

2705. By Jean-Paul Calderone

Delete the POP3UID objects once the messages are deleted from the server.

2704. By Jean-Paul Calderone

Kind of gross test for the protocol integration, and the simple implementation change to make it pass

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'Quotient/xquotient/grabber.py'
--- Quotient/xquotient/grabber.py 2012-05-11 14:05:29 +0000
+++ Quotient/xquotient/grabber.py 2013-01-02 01:49:22 +0000
@@ -3,7 +3,7 @@
3from epsilon import hotfix3from epsilon import hotfix
4hotfix.require('twisted', 'deferredgenerator_tfailure')4hotfix.require('twisted', 'deferredgenerator_tfailure')
55
6import time, datetime6import time, datetime, functools
77
8from twisted.mail import pop3, pop3client8from twisted.mail import pop3, pop3client
9from twisted.internet import protocol, defer, ssl, error9from twisted.internet import protocol, defer, ssl, error
@@ -161,6 +161,12 @@
161161
162162
163class POP3UID(item.Item):163class POP3UID(item.Item):
164 schemaVersion = 2
165
166 retrieved = attributes.timestamp(doc="""
167 When this POP3 UID was retrieved (or when retrieval failed).
168 """, allowNone=False)
169
164 grabberID = attributes.text(doc="""170 grabberID = attributes.text(doc="""
165 A string identifying the email-address/port parts of a171 A string identifying the email-address/port parts of a
166 configured grabber172 configured grabber
@@ -173,14 +179,33 @@
173 failed = attributes.boolean(doc="""179 failed = attributes.boolean(doc="""
174 When set, indicates that an attempt was made to retrieve this UID,180 When set, indicates that an attempt was made to retrieve this UID,
175 but for some reason was unsuccessful.181 but for some reason was unsuccessful.
176 """, indexed=True, default=False)182 """, default=False)
177183
184 attributes.compoundIndex(grabberID, retrieved)
185
186
187def _pop3uid1to2(old):
188 return old.upgradeVersion(
189 POP3UID.typeName, 1, 2,
190 value=old.value, failed=old.failed, grabberID=old.grabberID,
191 retrieved=extime.Time())
192registerUpgrader(_pop3uid1to2, POP3UID.typeName, 1, 2)
193
194POP3UIDv1 = item.declareLegacyItem(POP3UID.typeName, 1, dict(
195 grabberID=attributes.text(indexed=True),
196 value=attributes.bytes(indexed=True),
197 failed=attributes.boolean(indexed=True, default=False)))
178198
179199
180class POP3Grabber(item.Item):200class POP3Grabber(item.Item):
181 """201 """
182 Item for retrieving email messages from a remote POP server.202 Item for retrieving email messages from a remote POP server.
183 """203 """
204 DELETE_DELAY = datetime.timedelta(days=7)
205
206 now = attributes.inmemory(doc="""
207 A callable returning a Time instance representing the current time.
208 """)
184209
185 config = attributes.reference(doc="""210 config = attributes.reference(doc="""
186 The L{GrabberConfiguration} which created this grabber.211 The L{GrabberConfiguration} which created this grabber.
@@ -271,6 +296,7 @@
271 self._pop3uids = None296 self._pop3uids = None
272 self.running = False297 self.running = False
273 self.protocol = None298 self.protocol = None
299 self.now = extime.Time
274 if self.status is None:300 if self.status is None:
275 self.status = Status(store=self.store, message=u'idle')301 self.status = Status(store=self.store, message=u'idle')
276302
@@ -288,6 +314,14 @@
288 # Don't run concurrently, ever.314 # Don't run concurrently, ever.
289 if self.running:315 if self.running:
290 return316 return
317
318 # Don't run while POP3UIDs are being upgraded. Any that have not yet
319 # been upgraded won't be returned from query(POP3UID) calls, which will
320 # confuse the logic about which messages to download. Eventually
321 # they'll all be upgraded and we'll resume grabbing.
322 if self.store.query(POP3UIDv1).count():
323 return
324
291 self.running = True325 self.running = True
292326
293 from twisted.internet import reactor327 from twisted.internet import reactor
@@ -340,10 +374,35 @@
340 grabberID = property(_grabberID)374 grabberID = property(_grabberID)
341375
342376
343 def shouldRetrieve(self, uidList):377 def shouldDelete(self, uidList):
344 """378 """
345 Return a list of (index, uid) pairs from C{uidList} which have not379 Return a list of (index, uid) pairs from C{uidList} which were
346 already been grabbed.380 downloaded long enough ago that they can be deleted now.
381 """
382 # Find at most 996 of them. Combined with the other query variables in
383 # the statement below, this reaches the SQLite3 query variable limit.
384 # Any additional will be picked up in the future.
385 uidList = uidList[:996]
386
387 # And further limit them to POP3UIDs which were retrieved at least
388 # DELETE_DELAY ago. Failed attempts do not count.
389 where = attributes.AND(
390 POP3UID.grabberID == self.grabberID,
391 POP3UID.retrieved < self.now() - self.DELETE_DELAY,
392 POP3UID.failed == False,
393 POP3UID.value.oneOf([pair[1] for pair in uidList]))
394
395 # Here are the server-side POP3 UIDs which we have downloaded and which
396 # are old enough, so we should delete them.
397 pop3uids = set(self.store.query(POP3UID, where).getColumn("value"))
398
399 return [pair for pair in uidList if pair[1] in pop3uids]
400
401
402 def _getPOP3UIDs(self):
403 """
404 Return all the L{POP3UID} instances created by this grabber which still
405 exist, perhaps from an in-memory cache.
347 """406 """
348 if self._pop3uids is None:407 if self._pop3uids is None:
349 before = time.time()408 before = time.time()
@@ -352,8 +411,17 @@
352 self._pop3uids = set(self.store.query(POP3UID, POP3UID.grabberID == self.grabberID).getColumn("value"))411 self._pop3uids = set(self.store.query(POP3UID, POP3UID.grabberID == self.grabberID).getColumn("value"))
353 after = time.time()412 after = time.time()
354 log.msg(interface=iaxiom.IStatEvent, stat_pop3uid_load_time=after - before)413 log.msg(interface=iaxiom.IStatEvent, stat_pop3uid_load_time=after - before)
414 return self._pop3uids
415
416
417 def shouldRetrieve(self, uidList):
418 """
419 Return a list of (index, uid) pairs from C{uidList} which have not
420 already been grabbed.
421 """
422 pop3uids = self._getPOP3UIDs()
355 log.msg(interface=iaxiom.IStatEvent, stat_pop3uid_check=len(uidList))423 log.msg(interface=iaxiom.IStatEvent, stat_pop3uid_check=len(uidList))
356 return [pair for pair in uidList if pair[1] not in self._pop3uids]424 return [pair for pair in uidList if pair[1] not in pop3uids]
357425
358426
359 def markSuccess(self, uid, msg):427 def markSuccess(self, uid, msg):
@@ -378,17 +446,33 @@
378 msg.archive()446 msg.archive()
379 log.msg(interface=iaxiom.IStatEvent, stat_messages_grabbed=1,447 log.msg(interface=iaxiom.IStatEvent, stat_messages_grabbed=1,
380 userstore=self.store)448 userstore=self.store)
381 POP3UID(store=self.store, grabberID=self.grabberID, value=uid)449 POP3UID(
450 store=self.store,
451 grabberID=self.grabberID,
452 value=uid,
453 retrieved=self.now())
382 if self._pop3uids is not None:454 if self._pop3uids is not None:
383 self._pop3uids.add(uid)455 self._pop3uids.add(uid)
384456
385457
386 def markFailure(self, uid, err):458 def markFailure(self, uid, err):
387 POP3UID(store=self.store, grabberID=self.grabberID, value=uid, failed=True)459 POP3UID(
460 store=self.store,
461 grabberID=self.grabberID,
462 value=uid,
463 retrieved=self.now(),
464 failed=True)
388 if self._pop3uids is not None:465 if self._pop3uids is not None:
389 self._pop3uids.add(uid)466 self._pop3uids.add(uid)
390467
391468
469 def markDeleted(self, uid):
470 where = attributes.AND(
471 POP3UID.value == uid, POP3UID.grabberID == self.grabberID)
472 query = self.store.query(POP3UID, where)
473 query.deleteFromStore()
474
475
392476
393class POP3GrabberProtocol(pop3.AdvancedPOP3Client):477class POP3GrabberProtocol(pop3.AdvancedPOP3Client):
394 _rate = 50478 _rate = 50
@@ -479,6 +563,9 @@
479 # All the (index, uid) pairs which should be retrieved563 # All the (index, uid) pairs which should be retrieved
480 uidList = []564 uidList = []
481565
566 # All of the (index, uid) pairs which should be deleted
567 uidDeleteList = []
568
482 # Consumer for listUID - adds to the working set and processes569 # Consumer for listUID - adds to the working set and processes
483 # a batch if appropriate.570 # a batch if appropriate.
484 def consumeUIDLine(ent):571 def consumeUIDLine(ent):
@@ -487,9 +574,14 @@
487 processBatch()574 processBatch()
488575
489 def processBatch():576 def processBatch():
490 L = self.shouldRetrieve(uidWorkingSet)577 toRetrieve = self.shouldRetrieve(uidWorkingSet)
491 L.sort()578 toRetrieve.sort()
492 uidList.extend(L)579 uidList.extend(toRetrieve)
580
581 toDelete = self.shouldDelete(uidWorkingSet)
582 toDelete.sort()
583 uidDeleteList.extend(toDelete)
584
493 del uidWorkingSet[:]585 del uidWorkingSet[:]
494586
495587
@@ -555,6 +647,17 @@
555 else:647 else:
556 self.markSuccess(uid, rece.message)648 self.markSuccess(uid, rece.message)
557649
650 # Delete any old messages that should now be deleted
651 for (index, uid) in uidDeleteList:
652 d = defer.waitForDeferred(self.delete(index))
653 yield d
654 try:
655 d.getResult()
656 except (error.ConnectionDone, error.ConnectionLost):
657 return
658
659 self.markDeleted(uid)
660
558 self.setStatus(u"Logging out...")661 self.setStatus(u"Logging out...")
559 d = defer.waitForDeferred(self.quit())662 d = defer.waitForDeferred(self.quit())
560 yield d663 yield d
@@ -584,56 +687,68 @@
584687
585688
586689
690def _requiresGrabberItem(f):
691 """
692 Decorator for a method on ControlledPOP3GrabberProtocol which makes it safe
693 to call even after the connection has been lost.
694 """
695 @functools.wraps(f)
696 def safe(self, *args, **kwargs):
697 if self.grabber is not None:
698 return self.grabber.store.transact(f, self, *args, **kwargs)
699 return safe
700
701
702
587class ControlledPOP3GrabberProtocol(POP3GrabberProtocol):703class ControlledPOP3GrabberProtocol(POP3GrabberProtocol):
588 def _transact(self, *a, **kw):704 _transient = False
589 return self.grabber.store.transact(*a, **kw)705 def transientFailure(self, f):
590706 self._transient = True
591707
708
709 @_requiresGrabberItem
592 def getSource(self):710 def getSource(self):
593 return u'pop3://' + self.grabber.grabberID711 return u'pop3://' + self.grabber.grabberID
594712
595713
714 @_requiresGrabberItem
596 def setStatus(self, msg, success=True):715 def setStatus(self, msg, success=True):
597 if self.grabber is not None:716 return self.grabber.status.setStatus(msg, success)
598 self._transact(self.grabber.status.setStatus, msg, success)717
599718
600719 @_requiresGrabberItem
601 def shouldRetrieve(self, uidList):720 def shouldRetrieve(self, uidList):
602 if self.grabber is not None:721 return self.grabber.shouldRetrieve(uidList)
603 return self._transact(self.grabber.shouldRetrieve, uidList)722
604723
605724 @_requiresGrabberItem
725 def shouldDelete(self, uidList):
726 return self.grabber.shouldDelete(uidList)
727
728
729 @_requiresGrabberItem
606 def createMIMEReceiver(self, source):730 def createMIMEReceiver(self, source):
607 if self.grabber is not None:731 agent = self.grabber.config.deliveryAgent
608 def createIt():732 return agent.createMIMEReceiver(source)
609 agent = self.grabber.config.deliveryAgent733
610 return agent.createMIMEReceiver(source)734
611 return self._transact(createIt)735 @_requiresGrabberItem
612
613
614 def markSuccess(self, uid, msg):736 def markSuccess(self, uid, msg):
615 if self.grabber is not None:737 return self.grabber.markSuccess(uid, msg)
616 return self._transact(self.grabber.markSuccess, uid, msg)738
617739
618740 @_requiresGrabberItem
619 def markFailure(self, uid, reason):741 def markFailure(self, uid, reason):
620 if self.grabber is not None:742 return self.grabber.markFailure(uid, reason)
621 return self._transact(self.grabber.markFailure, uid, reason)743
622744
623745 @_requiresGrabberItem
624 def paused(self):746 def paused(self):
625 if self.grabber is not None:747 return self.grabber.paused
626 return self.grabber.paused748
627749
628750 @_requiresGrabberItem
629 _transient = False
630 def transientFailure(self, f):
631 self._transient = True
632
633
634 def stoppedRunning(self):751 def stoppedRunning(self):
635 if self.grabber is None:
636 return
637 self.grabber.running = False752 self.grabber.running = False
638 if self._transient:753 if self._transient:
639 iaxiom.IScheduler(self.grabber.store).reschedule(754 iaxiom.IScheduler(self.grabber.store).reschedule(
640755
=== added file 'Quotient/xquotient/test/historic/pop3uid1to2.axiom.tbz2'
641Binary files Quotient/xquotient/test/historic/pop3uid1to2.axiom.tbz2 1970-01-01 00:00:00 +0000 and Quotient/xquotient/test/historic/pop3uid1to2.axiom.tbz2 2013-01-02 01:49:22 +0000 differ756Binary files Quotient/xquotient/test/historic/pop3uid1to2.axiom.tbz2 1970-01-01 00:00:00 +0000 and Quotient/xquotient/test/historic/pop3uid1to2.axiom.tbz2 2013-01-02 01:49:22 +0000 differ
=== added file 'Quotient/xquotient/test/historic/stub_pop3uid1to2.py'
--- Quotient/xquotient/test/historic/stub_pop3uid1to2.py 1970-01-01 00:00:00 +0000
+++ Quotient/xquotient/test/historic/stub_pop3uid1to2.py 2013-01-02 01:49:22 +0000
@@ -0,0 +1,37 @@
1# -*- test-case-name: xquotient.test.historic.test_pop3uid1to2 -*-
2
3"""
4Create stub database for upgrade of L{xquotient.grabber.POP3UID} from version 1
5to version 2.
6"""
7
8from axiom.test.historic.stubloader import saveStub
9
10from axiom.userbase import LoginSystem
11from axiom.dependency import installOn
12
13from xquotient.grabber import POP3UID
14
15VALUE = b"12345678abcdefgh"
16FAILED = False
17GRABBER_ID = u"alice@example.com:1234"
18
19def createDatabase(s):
20 """
21 Create an account in the given store and create a POP3UID item in it.
22 """
23 loginSystem = LoginSystem(store=s)
24 installOn(loginSystem, s)
25
26 account = loginSystem.addAccount(u'testuser', u'localhost', None)
27 subStore = account.avatars.open()
28
29 POP3UID(
30 store=subStore,
31 value=VALUE,
32 failed=FAILED,
33 grabberID=GRABBER_ID)
34
35
36if __name__ == '__main__':
37 saveStub(createDatabase, 'exarkun@twistedmatrix.com-20120913121256-tg7d6l1w3rkpfehr')
038
=== added file 'Quotient/xquotient/test/historic/test_pop3uid1to2.py'
--- Quotient/xquotient/test/historic/test_pop3uid1to2.py 1970-01-01 00:00:00 +0000
+++ Quotient/xquotient/test/historic/test_pop3uid1to2.py 2013-01-02 01:49:22 +0000
@@ -0,0 +1,32 @@
1
2"""
3Test that a version 1 POP3UID is unchanged by the upgrade except that it gains a
4value for the new C{retrieved} attribute set to something near the current time.
5"""
6
7from epsilon.extime import Time
8
9from axiom.userbase import LoginSystem
10from axiom.test.historic.stubloader import StubbedTest
11
12from xquotient.test.historic.stub_pop3uid1to2 import VALUE, FAILED, GRABBER_ID
13from xquotient.grabber import POP3UID
14
15class POP3UIDUpgradeTestCase(StubbedTest):
16 def test_attributes(self):
17 loginSystem = self.store.findUnique(LoginSystem)
18 account = loginSystem.accountByAddress(u'testuser', u'localhost')
19 subStore = account.avatars.open()
20
21 d = subStore.whenFullyUpgraded()
22 def upgraded(ignored):
23 [pop3uid] = list(subStore.query(POP3UID))
24 self.assertEqual(VALUE, pop3uid.value)
25 self.assertEqual(FAILED, pop3uid.failed)
26 self.assertEqual(GRABBER_ID, pop3uid.grabberID)
27
28 # This will be close enough.
29 elapsed = (Time() - pop3uid.retrieved).total_seconds()
30 self.assertTrue(abs(elapsed) < 60)
31 d.addCallback(upgraded)
32 return d
033
=== modified file 'Quotient/xquotient/test/test_grabber.py'
--- Quotient/xquotient/test/test_grabber.py 2012-05-11 14:05:29 +0000
+++ Quotient/xquotient/test/test_grabber.py 2013-01-02 01:49:22 +0000
@@ -3,8 +3,11 @@
33
4from datetime import timedelta4from datetime import timedelta
55
6from zope.interface import directlyProvides
7
6from twisted.trial import unittest8from twisted.trial import unittest
7from twisted.internet import defer, error9from twisted.internet import defer, error
10from twisted.internet.interfaces import ISSLTransport
8from twisted.mail import pop311from twisted.mail import pop3
9from twisted.cred import error as ecred12from twisted.cred import error as ecred
10from twisted.test.proto_helpers import StringTransport13from twisted.test.proto_helpers import StringTransport
@@ -15,6 +18,7 @@
15from epsilon.test import iosim18from epsilon.test import iosim
1619
17from axiom import iaxiom, store, substore, scheduler20from axiom import iaxiom, store, substore, scheduler
21from axiom.test.util import QueryCounter
1822
19from xquotient import grabber, mimepart23from xquotient import grabber, mimepart
2024
@@ -50,6 +54,8 @@
50 def connectionMade(self):54 def connectionMade(self):
51 grabber.POP3GrabberProtocol.connectionMade(self)55 grabber.POP3GrabberProtocol.connectionMade(self)
52 self.events = []56 self.events = []
57 self.uidsForDeletion = set()
58 self.uidsNotForRetrieval = set()
5359
5460
55 def getSource(self):61 def getSource(self):
@@ -62,8 +68,13 @@
6268
6369
64 def shouldRetrieve(self, uidList):70 def shouldRetrieve(self, uidList):
65 self.events.append(('retrieve', uidList))71 self.events.append(('retrieve', list(uidList)))
66 return list(uidList)72 return [pair for pair in uidList if pair[1] not in self.uidsNotForRetrieval]
73
74
75 def shouldDelete(self, uidList):
76 self.events.append(('delete', list(uidList)))
77 return [pair for pair in uidList if pair[1] in self.uidsForDeletion]
6778
6879
69 def createMIMEReceiver(self, source):80 def createMIMEReceiver(self, source):
@@ -80,6 +91,10 @@
80 self.events.append(('failure', uid, reason))91 self.events.append(('failure', uid, reason))
8192
8293
94 def markDeleted(self, uid):
95 self.events.append(('markDeleted', uid))
96
97
83 def paused(self):98 def paused(self):
84 self.events.append(('paused',))99 self.events.append(('paused',))
85 return False100 return False
@@ -229,6 +244,52 @@
229 'stopped')244 'stopped')
230245
231246
247 def test_deletion(self):
248 """
249 Messages indicated by C{shouldDelete} to be ready for deleted are
250 deleted using the I{DELE} POP3 protocol action.
251 """
252 transport = StringTransport()
253 # Convince the client to log in
254 directlyProvides(transport, ISSLTransport)
255
256 self.client.makeConnection(transport)
257 self.addCleanup(self.client.connectionLost, error.ConnectionLost("Simulated"))
258
259 self.client.uidsForDeletion.add(b'xyz')
260 self.client.uidsNotForRetrieval.add(b'abc')
261 self.client.uidsNotForRetrieval.add(b'xyz')
262
263 # Server greeting
264 self.client.dataReceived("+OK Hello\r\n")
265 # CAPA response
266 self.client.dataReceived("+OK\r\nUSER\r\nUIDL\r\n.\r\n")
267 # USER response
268 self.client.dataReceived("+OK\r\n")
269 # PASS response
270 self.client.dataReceived("+OK\r\n")
271
272 del self.client.events[:]
273 transport.clear()
274
275 # UIDL response
276 self.client.dataReceived('+OK \r\n1 abc\r\n3 xyz\r\n.\r\n')
277
278 # Protocol should consult shouldDelete with the UIDs and start issuing
279 # delete commands.
280 self.assertEquals(
281 [('delete', [(0, 'abc'), (2, 'xyz')])],
282 [event for event in self.client.events if event[0] == 'delete'])
283 self.assertEqual("DELE 3\r\n", transport.value())
284
285 del self.client.events[:]
286
287 # DELE response
288 self.client.dataReceived("+OK\r\n")
289
290 self.assertEquals(('markDeleted', 'xyz'), self.client.events[0])
291
292
232 def testLineTooLong(self):293 def testLineTooLong(self):
233 """294 """
234 Make sure a message illegally served with a line longer than we will295 Make sure a message illegally served with a line longer than we will
@@ -400,21 +461,38 @@
400 self.assertTrue(scheduled[0] <= extime.Time())461 self.assertTrue(scheduled[0] <= extime.Time())
401462
402463
464 def _timeoutTest(self, exchange):
465 """
466 Exercise handling of a connection timeout at some phase of the
467 interaction.
468 """
469 transport = StringTransport()
470 factory = grabber.POP3GrabberFactory(self.grabberItem, False)
471 protocol = factory.buildProtocol(None)
472 protocol.allowInsecureLogin = True
473 protocol.makeConnection(transport)
474
475 for (serverMessage, clientMessage) in exchange:
476 protocol.dataReceived(serverMessage)
477 self.assertEqual(clientMessage, transport.value())
478 transport.clear()
479
480 protocol.timeoutConnection()
481 self.assertTrue(transport.disconnecting)
482 protocol.connectionLost(Failure(error.ConnectionLost("Simulated")))
483
484 self.assertEqual(
485 self.grabberItem.status.message,
486 u"Timed out waiting for server response.")
487
488
403 def test_stoppedRunningAfterTimeout(self):489 def test_stoppedRunningAfterTimeout(self):
404 """490 """
405 When L{ControlledPOP3GrabberProtocol} times out the connection491 When L{ControlledPOP3GrabberProtocol} times out the connection
406 due to inactivity, the controlling grabber's status is set to492 due to inactivity, the controlling grabber's status is set to
407 reflect this.493 reflect this.
408 """494 """
409 factory = grabber.POP3GrabberFactory(self.grabberItem, False)495 self._timeoutTest([])
410 protocol = factory.buildProtocol(None)
411 protocol.makeConnection(StringTransport())
412 protocol.timeoutConnection()
413 protocol.connectionLost(Failure(error.ConnectionLost("Simulated")))
414
415 self.assertEqual(
416 self.grabberItem.status.message,
417 u"Timed out waiting for server response.")
418496
419497
420 def test_stoppedRunningAfterListTimeout(self):498 def test_stoppedRunningAfterListTimeout(self):
@@ -424,28 +502,53 @@
424 (list UIDs) command, the controlling grabber's status is set502 (list UIDs) command, the controlling grabber's status is set
425 to reflect this.503 to reflect this.
426 """504 """
427 factory = grabber.POP3GrabberFactory(self.grabberItem, False)505 self._timeoutTest([
428 protocol = factory.buildProtocol(None)506 # Server greeting
429 protocol.allowInsecureLogin = True507 (b"+OK Hello\r\n", b"CAPA\r\n"),
430 protocol.makeConnection(StringTransport())508 # CAPA response
431 # Server greeting509 (b"+OK\r\nUSER\r\nUIDL\r\n.\r\n", b"USER alice\r\n"),
432 protocol.dataReceived("+OK Hello\r\n") 510 # USER response
433 # CAPA response511 (b"+OK\r\n", b"PASS secret\r\n"),
434 protocol.dataReceived("+OK\r\nUSER\r\nUIDL\r\n.\r\n")512 # PASS response
435 # USER response513 (b"+OK\r\n", b"UIDL\r\n")])
436 protocol.dataReceived("+OK\r\n")514
437 # PASS response515
438 protocol.dataReceived("+OK\r\n")516 def test_stoppedRunningAfterDeleteTimeout(self):
439 # Sanity check, we should have gotten to sending the UIDL517 # Set up some good state to want to delete
440 self.assertTrue(518 uid = b'abc'
441 protocol.transport.value().endswith("\r\nUIDL\r\n"),519 delay = self.grabberItem.DELETE_DELAY
442 "Failed to get to UIDL: %r" % (protocol.transport.value(),))520 future = extime.Time()
443521 now = future - delay - timedelta(seconds=1)
444 protocol.timeoutConnection()522 self.grabberItem.now = lambda: now
445 protocol.connectionLost(Failure(error.ConnectionLost("Simulated")))523 self.grabberItem.markSuccess(uid, StubMessage())
446 self.assertEqual(524 now = future
447 self.grabberItem.status.message,525
448 u"Timed out waiting for server response.")526 self._timeoutTest([
527 # Server greeting
528 (b"+OK Hello\r\n", b"CAPA\r\n"),
529 # CAPA response
530 (b"+OK\r\nUSER\r\nUIDL\r\n.\r\n", b"USER alice\r\n"),
531 # USER response
532 (b"+OK\r\n", b"PASS secret\r\n"),
533 # PASS response
534 (b"+OK\r\n", b"UIDL\r\n"),
535 # UIDL response
536 (b"+OK\r\n1 abc\r\n.\r\n", b"DELE 1\r\n")])
537
538
539 def test_notGrabWhileUpgrading(self):
540 """
541 As long as any old (schemaVersion less than most recent) L{POP3UID}
542 items remain in the database, L{POP3Grabber.grab} does not try to grab
543 any messages.
544 """
545 grabber.POP3UIDv1(
546 store=self.userStore,
547 grabberID=self.grabberItem.grabberID,
548 failed=False,
549 value=b'xyz')
550 self.grabberItem.grab()
551 self.assertFalse(self.grabberItem.running)
449552
450553
451554
@@ -490,7 +593,8 @@
490 for i in xrange(100, 200):593 for i in xrange(100, 200):
491 grabber.POP3UID(store=self.store,594 grabber.POP3UID(store=self.store,
492 grabberID=self.grabber.grabberID,595 grabberID=self.grabber.grabberID,
493 value=str(i))596 value=str(i),
597 retrieved=extime.Time())
494598
495599
496 def testShouldRetrieve(self):600 def testShouldRetrieve(self):
@@ -541,6 +645,34 @@
541 [(49, '49'), (51, '51')])645 [(49, '49'), (51, '51')])
542646
543647
648 def test_successTimestamp(self):
649 """
650 The L{POP3UID} instance created by L{POP3Grabber.markSuccess} has its
651 C{retrieved} attribute set to the current time as reported by
652 L{POP3Grabber.now}.
653 """
654 now = extime.Time()
655 self.grabber.now = lambda: now
656 self.grabber.markSuccess(b'123abc', StubMessage())
657 [pop3uid] = list(self.store.query(
658 grabber.POP3UID, grabber.POP3UID.value == b'123abc'))
659 self.assertEqual(now, pop3uid.retrieved)
660
661
662 def test_failureTimestamp(self):
663 """
664 The L{POP3UID} instance created by L{POP3Grabber.markFailure} has its
665 C{retrieved} attribute set to the current time as reported by
666 L{POP3Grabber.now}.
667 """
668 now = extime.Time()
669 self.grabber.now = lambda: now
670 self.grabber.markFailure(b'123abc', object())
671 [pop3uid] = list(self.store.query(
672 grabber.POP3UID, grabber.POP3UID.value == b'123abc'))
673 self.assertEqual(now, pop3uid.retrieved)
674
675
544 def test_delete(self):676 def test_delete(self):
545 """677 """
546 L{POP3Grabber.delete} unschedules the grabber.678 L{POP3Grabber.delete} unschedules the grabber.
@@ -553,3 +685,213 @@
553 # was scheduled either.685 # was scheduled either.
554 self.assertEqual(686 self.assertEqual(
555 [], list(store.query(scheduler.TimedEvent)))687 [], list(store.query(scheduler.TimedEvent)))
688
689
690 def test_shouldDeleteOldMessage(self):
691 """
692 C{shouldDelete} accepts a list of (index, uid) pairs and returns a list
693 of (index, uid) pairs corresponding to messages which may now be deleted
694 from the POP3 server (due to having been downloaded more than a fixed
695 number of days in the past).
696 """
697 epoch = extime.Time()
698 now = epoch - (self.grabber.DELETE_DELAY + timedelta(days=1))
699
700 self.grabber.now = lambda: now
701
702 # Generate some state representing a past success
703 oldEnough = b'123abc'
704 self.grabber.markSuccess(oldEnough, StubMessage())
705
706 # Wind the clock forward far enough so that oldEnough should be
707 # considered old enough for deletion.
708 now = epoch
709
710 self.assertEqual(
711 [(3, oldEnough)], self.grabber.shouldDelete([(3, oldEnough)]))
712
713
714 def test_shouldDeleteOtherGrabberState(self):
715 """
716 Messages downloaded by an unrelated grabber are not considered by
717 C{shouldDelete}.
718 """
719 uid = b'abcdef'
720 then = extime.Time() - self.grabber.DELETE_DELAY - timedelta(days=1)
721 grabber.POP3UID(
722 store=self.store, grabberID=u'bob@example.org:default', value=uid,
723 retrieved=then)
724
725 self.assertEqual([], self.grabber.shouldDelete([(5, uid)]))
726
727
728
729 def test_shouldDeleteNewMessage(self):
730 """
731 Messages downloaded less than a fixed number of days in the past are not
732 indicated as deletable by C{shouldDelete}.
733 """
734 epoch = extime.Time()
735 now = epoch - (self.grabber.DELETE_DELAY - timedelta(days=1))
736
737 self.grabber.now = lambda: now
738
739 # Generate some state representing a *recently* past success
740 newEnough = b'xyz123'
741 self.grabber.markSuccess(newEnough, StubMessage())
742
743 # Wind the clock forward, but not so far forward that newEnough is
744 # considered old enough for deletion.
745 now = epoch
746
747 self.assertEqual(
748 [], self.grabber.shouldDelete([(5, newEnough)]))
749
750
751 def test_shouldDeleteFailedMessage(self):
752 """
753 Messages for which the download failed are not indicated as deletable by
754 C{shouldDelete}.
755 """
756 epoch = extime.Time()
757 now = epoch - (self.grabber.DELETE_DELAY + timedelta(days=1))
758
759 self.grabber.now = lambda: now
760
761 # Generate some state representing a past failure
762 failed = b'xyz123'
763 self.grabber.markFailure(failed, object())
764
765 # Wind the clock forward enough so that the failed message would be old
766 # enough - if it had been a success.
767 now = epoch
768
769 self.assertEqual(
770 [], self.grabber.shouldDelete([(7, failed)]))
771
772
773 def test_shouldDeleteUnknownMessage(self):
774 """
775 Messages which have not been downloaded are not indicated as deletable
776 by C{shouldDelete}.
777 """
778 self.assertEqual(
779 [], self.grabber.shouldDelete([(7, b'9876wxyz')]))
780
781
782 def test_shouldDeleteMessageLimit(self):
783 """
784 At most around 1000 (the exact value of the limit will be imposed by
785 SQLite3) messages are considered for deletion by C{shouldDelete}.
786 """
787 epoch = extime.Time()
788 now = epoch - (self.grabber.DELETE_DELAY + timedelta(days=1))
789 self.grabber.now = lambda: now
790
791 uidList = []
792 for i in range(1100):
793 uid = b'%dabc' % (i,)
794 uidList.append((i, uid))
795 self.grabber.markSuccess(uid, StubMessage())
796
797 # Spin the clock forward so all those messages are considered deletable.
798 now = epoch
799 self.assertEqual(
800 uidList[:996], self.grabber.shouldDelete(uidList))
801
802
803 def test_now(self):
804 """
805 L{POP3Grabber.now} returns the current time.
806 """
807 self.assertTrue(extime.Time() <= self.grabber.now())
808 self.assertTrue(self.grabber.now() <= extime.Time())
809
810
811 def test_markDeleted(self):
812 """
813 L{POP3Grabber.markDeleted} deletes the L{POP3UID} corresponding to the
814 message UID passed in.
815 """
816 uid = b'abcdef'
817 self.grabber.markSuccess(uid, StubMessage())
818 self.grabber.markDeleted(uid)
819 persistentUIDs = list(self.store.query(
820 grabber.POP3UID, grabber.POP3UID.value == uid))
821 self.assertEqual([], persistentUIDs)
822
823
824 def test_markDeletedOtherGrabber(self):
825 """
826 L{POP3Grabber.markDeleted} does not delete a L{POP3UID} with a matching
827 message UID but which belongs to a different grabber.
828 """
829 uid = b'abcdef'
830 pop3uid = grabber.POP3UID(
831 store=self.store,
832 grabberID=u'bob@example.org:default',
833 value=uid,
834 retrieved=extime.Time())
835 self.grabber.markDeleted(uid)
836 persistentUIDs = list(self.store.query(
837 grabber.POP3UID, grabber.POP3UID.value == uid))
838 self.assertEqual([pop3uid], persistentUIDs)
839
840
841
842class ShouldDeleteComplexityTests(unittest.TestCase):
843 """
844 Tests for the query complexity of L{POP3Grabber.shouldDelete}.
845 """
846 def test_otherGrabber(self):
847 """
848 The database complexity of L{POP3Grabber.shouldDelete} is independent of
849 the number of L{POP3UID} items which belong to another L{POP3Grabber}.
850 """
851 self._complexityTest(
852 lambda grabberItem: grabber.POP3UID(
853 store=grabberItem.store, retrieved=extime.Time(), failed=False,
854 grabberID=grabberItem.grabberID + b'unrelated', value=b'123'))
855
856
857 def test_shouldNotDelete(self):
858 """
859 The database complexity of L{POP3Grabber.shouldDelete} is independent of
860 the number of L{POP3UID} items which exist in the database but do not
861 yet merit deletion.
862 """
863 self._complexityTest(
864 lambda grabberItem: grabber.POP3UID(
865 store=grabberItem.store, retrieved=extime.Time(), failed=False,
866 grabberID=grabberItem.grabberID, value=b'def'))
867
868
869 def _complexityTest(self, makePOP3UID):
870 s = store.Store()
871 counter = QueryCounter(s)
872
873 config = grabber.GrabberConfiguration(store=s)
874 grabberItem = grabber.POP3Grabber(
875 store=s,
876 config=config,
877 username=u"testuser",
878 domain=u"example.com",
879 password=u"password")
880
881 # Create at least one POP3UID, since zero-items-in-table is always
882 # different from any-items-in-table.
883 for i in range(5):
884 grabber.POP3UID(
885 store=s, retrieved=extime.Time(), failed=False,
886 grabberID=grabberItem.grabberID, value=b'abc' + str(i))
887
888 fewer = counter.measure(
889 lambda: grabberItem.shouldDelete([b"123"]))
890
891 # Create another non-matching POP3UID
892 makePOP3UID(grabberItem)
893
894 more = counter.measure(
895 lambda: grabberItem.shouldDelete([b"123"]))
896
897 self.assertEqual(fewer, more)

Subscribers

People subscribed via source and target branches