Merge lp:~maxb/udd/modular-threads into lp:udd

Proposed by Max Bowsher
Status: Merged
Approved by: Vincent Ladeuil
Approved revision: 467
Merged at revision: 466
Proposed branch: lp:~maxb/udd/modular-threads
Merge into: lp:udd
Diff against target: 669 lines (+256/-258)
5 files modified
mass_import.py (+3/-5)
selftest.py (+1/-1)
udd/icommon.py (+0/-240)
udd/tests/test_threads.py (+10/-12)
udd/threads.py (+242/-0)
To merge this branch: bzr merge lp:~maxb/udd/modular-threads
Reviewer Review Type Date Requested Status
Vincent Ladeuil Approve
Ubuntu Distributed Development Developers Pending
Review via email: mp+64640@code.launchpad.net

Description of the change

A chunk of splitting up icommon.

I ran a local mass_import.py to check it still worked.

To post a comment you must log in.
Revision history for this message
John A Meinel (jameinel) wrote :

-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

On 6/15/2011 9:32 AM, Max Bowsher wrote:
> This includes planning and spawning imports, tracking their success or
> @@ -301,7 +302,7 @@
> # doing all the filesystem calls every milisecond but the user
> # shouldn't have to wait too long to see his modifications
> # taken into account either.
> - self.driver.stopped.wait(0.2)
> + self.driver.stopped.wait(5)
> if self.driver.stopped.isSet():
> break
> # Then check the graceful stop file
>

That is more than just moving the code around.

The rest seems fine to me, but I remember talking with vila quite a bit
about his threading patches, so I'd like him to take a look at this.

John
=:->
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.9 (Cygwin)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org/

iEYEARECAAYFAk34nF0ACgkQJdeBCYSNAAPiwACeMTfzREOn35a2JYxSq4viAthn
yzcAn0YiAeITR4UjQxlR8JucUGr5k5Lq
=/CNm
-----END PGP SIGNATURE-----

Revision history for this message
Vincent Ladeuil (vila) wrote :

Please don't mix up refactoring and changes like:

40 - self.driver.stopped.wait(0.2)
41 + self.driver.stopped.wait(5)

What's the point of this change ? The comment above this line
clearly states that the value has been tweaked to match user
expectations. In this case, you're making the user waits up to 5
seconds before getting some feedback that the importer will stop
and there have been cases where a losa was trigger happy and
killed the job instead of letting it finish properly which lost
some valuable feedback (I don't remember the exact details, only
that killing wasn't the right solution).

I don't mind discussing this kind of change if you feel strongly
about it but let's do that in a specific proposal then.
For the rest, I prefer importing modules rather than symbols, but...
well, I won't block on this.

So feel free to land the rest.

review: Needs Fixing
Revision history for this message
Max Bowsher (maxb) wrote :

Whoops!

That was a temporary local hack whilst trying to figure out why mass_import was tight-looping for me locally. It should never have been committed.

Revision history for this message
Max Bowsher (maxb) wrote :

> I prefer importing modules rather than symbols, but...
> well, I won't block on this.

I do not have any strong preference in this case, so I am quite happy to go along with yours here.

I only feel strongly about importing symbols directly when the same symbol is used many times, and the advantage of brevity is significant.

Revision history for this message
Vincent Ladeuil (vila) wrote :

Cool, thanks.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'mass_import.py'
2--- mass_import.py 2011-06-15 00:28:18 +0000
3+++ mass_import.py 2011-06-15 14:39:30 +0000
4@@ -17,9 +17,7 @@
5
6 from udd import icommon
7 from udd import iconfig
8-
9-#import httplib2
10-#httplib2.debuglevel = 1
11+from udd import threads
12
13
14 class WatchedFileHandler(logging.FileHandler):
15@@ -129,7 +127,7 @@
16 ''.join(traceback.format_exception(exc_class, exc_value,
17 exc_tb))))
18
19-class Importer(icommon.SubprocessMonitor):
20+class Importer(threads.SubprocessMonitor):
21
22 def __init__(self, args, package_name, job_id):
23 # FIXME: We should put the max_duration in a config file when it
24@@ -223,7 +221,7 @@
25 self.tried = set()
26
27
28-class ImportDriver(icommon.ThreadDriver):
29+class ImportDriver(threads.ThreadDriver):
30 """Monitor the ThreadedImporter.
31
32 This includes planning and spawning imports, tracking their success or
33
34=== modified file 'selftest.py'
35--- selftest.py 2011-06-14 23:58:08 +0000
36+++ selftest.py 2011-06-15 14:39:30 +0000
37@@ -44,10 +44,10 @@
38 'udd.tests.test_config',
39 'udd.tests.test_import_list',
40 'udd.tests.test_import_package',
41- 'udd.tests.test_mass_import',
42 'udd.tests.test_package_to_import',
43 'udd.tests.test_revid_database',
44 'udd.tests.test_status_database',
45+ 'udd.tests.test_threads',
46 ]))
47 self.test = suite
48
49
50=== modified file 'udd/icommon.py'
51--- udd/icommon.py 2011-06-15 06:54:16 +0000
52+++ udd/icommon.py 2011-06-15 14:39:30 +0000
53@@ -1,23 +1,17 @@
54-#!/usr/bin/python
55-
56 import cgi
57 import datetime
58 import errno
59 import fcntl
60 import operator
61 import os
62-import Queue
63 import random
64 import re
65 import shutil
66 import simplejson
67-import signal
68 import socket
69 import sqlite3
70 import StringIO
71-import subprocess
72 import sys
73-import threading
74 import time
75 import urllib
76 import urllib2
77@@ -42,12 +36,6 @@
78 )
79 from bzrlib.trace import mutter
80
81-try:
82- from bzrlib import cethread
83-except ImportError:
84- # Use our copy then
85- from udd import cethread
86-
87 from launchpadlib.credentials import Credentials
88 from launchpadlib.errors import HTTPError
89 from launchpadlib.launchpad import Launchpad, EDGE_SERVICE_ROOT
90@@ -2007,234 +1995,6 @@
91 f.close()
92
93
94-class CatchingExceptionThread(cethread.CatchingExceptionThread):
95- """Isolating features that needs to be backported to bzrlib.
96-
97- None so far.
98- """
99-
100-
101-class SubprocessMonitor(CatchingExceptionThread):
102-
103- def __init__(self, args, max_duration=None):
104- """Create a SubprocessMonitor.
105-
106- :param args: A list of arguments to give to python.
107-
108- :param max_duration: The max number of seconds the command is expected
109- to be run (elapsed time).
110- """
111- self.started = threading.Event()
112- self.stopped = threading.Event()
113- super(SubprocessMonitor, self).__init__(target=self.spawn,
114- sync_event=self.started)
115- self.args = args
116- self.proc_pid = None
117- self.retcode = None
118- self.out = self.err = None
119- self.started_at = self.ended_at = None
120- self.max_duration = max_duration
121- self.killed_with = None
122- self.killed_at = None
123-
124- def spawn(self):
125- """Spawn the python command in a subprocess."""
126- proc = subprocess.Popen(['python'] + self.args,
127- executable=sys.executable,
128- stdout=subprocess.PIPE,
129- stderr=subprocess.PIPE,
130- stdin=subprocess.PIPE)
131- self.proc_pid = proc.pid
132- self.started_at = time.time()
133- self.switch_and_set(self.stopped)
134- try:
135- self.out, self.err = proc.communicate()
136- self.ended_at = time.time()
137- self.retcode = proc.returncode
138- except OSError, e:
139- if e.errno in (errno.ECHILD, errno.ESRCH):
140- # The process doesn't exist anymore
141- pass
142- else:
143- raise
144- self.stopped.set()
145-
146- def collect(self, timeout=None):
147- """Collect relevant data before discarding the thread.
148-
149- This must be called by the controlling thread. Be aware that since this
150- calls join() it may raise the pending exception which must be cleared
151- if this behaviour is not the intended one.
152-
153- Daughter classes can override this method to collect any relevant data
154- (including the pending exception) before the thread is discarded.
155- """
156- self.join(timeout)
157-
158- def kill_with_escalation(self, grace_period=None):
159- """Attempt to kill the subprocess.
160-
161- :param grace_period: A floating number of seconds to wait before using
162- SIGKILL to kill the subprocess.
163-
164- When first called, `signal.SIGTERM` is used to allow the subprocess to
165- handle the kill cleanly. If called repeatedly, this will not try to
166- kill the process again until the end of the specified
167- `grace_period`. When the grace period expires, `signal.SIGKILL` is used
168- so that subprocesses that don't shutdown when receiving
169- `signal.SIGTERM` can still be shut down.
170-
171- This must be called by the controlling thread.
172- """
173- killed = False
174- if self.killed_with is None:
175- killed = self._kill(signal.SIGTERM)
176- else:
177- # We already tried to kill this this subprocess, if it didn't
178- # during the grace period, we'll kill it harder
179- now = time.time()
180- if grace_period is None:
181- # FIXME: This should go into a config file -- vila 2011-03-18
182- # 10 seconds should be more than enough for a process to die or
183- # something wrong is happening in which case it's better to get
184- # this problematic subprocess down as quickly as possible.
185- grace_period = 10.0
186- if now - self.killed_at > grace_period:
187- killed = self._kill(signal.SIGKILL)
188- return killed
189-
190- def _kill(self, signum):
191- killed = False
192- # Don't attempt to kill the subprocess if it hasn't been started
193- if self.proc_pid is not None:
194- try:
195- os.kill(self.proc_pid, signum)
196- killed = True
197- self.killed_with = signum
198- self.killed_at = time.time()
199- except OSError, e:
200- if e.errno in (errno.ECHILD, errno.ESRCH):
201- # The process doesn't exist anymore.
202- pass
203- else:
204- raise
205- return killed
206-
207- def has_exceeded_time_quota(self, duration=None):
208- """Is the thread running for too long ?
209-
210- :param duration: If specified will override the ``max_duration``
211- attribute.
212- """
213- if duration is None:
214- duration = self.max_duration
215- if (duration is not None
216- and self.started_at is not None and self.ended_at is None):
217- # A timeout is defined, the thread has been started but is not
218- # finished yet
219- if (time.time() - self.started_at) > self.max_duration:
220- return True
221- return False
222-
223-
224-class ThreadDriver(CatchingExceptionThread):
225-
226- def __init__(self, queue, max_threads, target=None):
227- """Create a ThreadDriver.
228-
229- :param queue: A thread provider.
230-
231- :param max_threads: Maximum number of threads allowed to run
232- concurrently.
233-
234- :param target: What callable should be run in the thread.
235- """
236- self.started = threading.Event()
237- self.stopped = threading.Event()
238- self.stop = threading.Event()
239- if target is None:
240- target = self.drive
241- super(ThreadDriver, self).__init__(target=target,
242- sync_event=self.started)
243- self.queue = queue
244- self.queue_closed = threading.Event()
245- self.max_threads = max_threads
246- self.threads = []
247-
248- def drive(self):
249- self.before_start()
250- # From now on the caller will let us run until we stop or die
251- self.switch_and_set(self.stopped)
252- while True:
253- # Calling wait ensures we can be interrupted properly, but we don't
254- # want to block so we use a timeout. We don't use 0 though or other
255- # threads may not get any cycle otherwise.
256- self.stop.wait(0.001)
257- if self.stop.isSet():
258- break
259- # Check for threads exceeding their expected lifetime
260- self.check_time_quota()
261- self.do_one_step()
262- self.before_stop()
263- self.stopped.set()
264-
265- def before_start(self):
266- pass
267-
268- def check_time_quota(self):
269- """Ensure threads don't exceed their elapsed time quota.
270-
271- :returns: The list of killed threads.
272- """
273- killed = []
274- for t in self.threads:
275- if t.has_exceeded_time_quota():
276- # Die you sucker
277- t.kill_with_escalation()
278- killed.append(t)
279- return killed
280-
281- def do_one_step(self):
282- self.collect_terminated_threads()
283- if (not self.stop.isSet() and not self.queue_closed.isSet()
284- and len(self.threads) < self.max_threads):
285- self.start_new_thread()
286- if self.queue_closed.isSet() and not self.threads:
287- # The queue is closed and there are no more running threads
288- self.stop.set()
289-
290- def before_stop(self):
291- self.kill_remaining_threads()
292-
293- def start_new_thread(self):
294- # Get a new thread from the queue and start it
295- try:
296- new_thread = self.queue.get_nowait()
297- except Queue.Empty:
298- return None
299- self.threads.append(new_thread)
300- new_thread.start()
301- new_thread.started.wait()
302- # Tell the queue we're done with this thread
303- self.queue.task_done()
304- return new_thread
305-
306- def collect_terminated_threads(self):
307- # If there are terminated threads, collect them
308- for t in self.threads[:]:
309- if not t.isAlive():
310- t.collect()
311- self.threads.remove(t)
312-
313- def kill_remaining_threads(self):
314- # Kill all the remaining threads
315- for t in self.threads[:]:
316- t.kill_with_escalation()
317- # Collect the thread whether we could kill it or not
318- t.collect()
319- self.threads.remove(t)
320-
321-
322 def refresh_possible_transports(possible_transports):
323 """Prune stale connections from possible_transports.
324
325
326=== renamed file 'udd/tests/test_mass_import.py' => 'udd/tests/test_threads.py'
327--- udd/tests/test_mass_import.py 2011-06-15 00:28:18 +0000
328+++ udd/tests/test_threads.py 2011-06-15 14:39:30 +0000
329@@ -4,11 +4,9 @@
330 import threading
331 import time
332
333-
334 from bzrlib import tests
335
336-
337-from udd import icommon
338+from udd import threads
339
340
341 def get_test_script_path(name):
342@@ -28,7 +26,7 @@
343
344 def run_in_subprocess(self, args, klass=None):
345 if klass is None:
346- klass = icommon.SubprocessMonitor
347+ klass = threads.SubprocessMonitor
348 sub = klass(args)
349 self.addCleanup(sub.join, 0)
350 sub.start()
351@@ -83,7 +81,7 @@
352 def test_kill_catch_zombie(self):
353 kill_me = threading.Event()
354 resume = threading.Event()
355- class TestMonitor(icommon.SubprocessMonitor):
356+ class TestMonitor(threads.SubprocessMonitor):
357
358 def switch_and_set(self, new):
359 super(TestMonitor, self).switch_and_set(new)
360@@ -121,7 +119,7 @@
361
362 def run_in_subprocess(self, args, klass=None):
363 if klass is None:
364- klass = icommon.SubprocessMonitor
365+ klass = threads.SubprocessMonitor
366 sub = klass(args)
367 self.addCleanup(sub.join, 0)
368 sub.start()
369@@ -187,7 +185,7 @@
370 self.assertEquals(sub.out, 'Try harder\n')
371
372
373-class Sleeper(icommon.SubprocessMonitor):
374+class Sleeper(threads.SubprocessMonitor):
375
376 def __init__(self, sleep_time=0.001, max_duration=None):
377 # sleep can't be killed (or interrupted) so we don't sleep for long but
378@@ -206,7 +204,7 @@
379 def get_driver(self, *args, **kwargs):
380 klass = kwargs.pop('klass', None)
381 if klass is None:
382- klass = icommon.ThreadDriver
383+ klass = threads.ThreadDriver
384 return klass(*args, **kwargs)
385
386 def get_started_driver(self, target=None, override=None, queue=None,
387@@ -286,7 +284,7 @@
388 self.started = threading.Event()
389 self.resumed = threading.Event()
390 self.terminated = threading.Event()
391- class TestThread(icommon.SubprocessMonitor):
392+ class TestThread(threads.SubprocessMonitor):
393
394 def collect(thread):
395 self.assertIs(self.thread, thread)
396@@ -297,7 +295,7 @@
397 q = Queue.Queue(1)
398 q.put(self.thread)
399
400- class TestDriver(icommon.ThreadDriver):
401+ class TestDriver(threads.ThreadDriver):
402
403 def start_new_thread(driver):
404 t = super(TestDriver, driver).start_new_thread()
405@@ -326,7 +324,7 @@
406 self.assertLength(0, driver.threads)
407
408 def test_thread_exception_raised_in_driver(self):
409- class TestThread(icommon.SubprocessMonitor):
410+ class TestThread(threads.SubprocessMonitor):
411
412 def collect(thread):
413 self.assertEquals(1, thread.retcode)
414@@ -436,7 +434,7 @@
415 self.addCleanup(sleeper.kill_with_escalation)
416 q.put(sleeper)
417 self.killed = None
418- class TestDriver(icommon.ThreadDriver):
419+ class TestDriver(threads.ThreadDriver):
420
421 def check_time_quota(driver):
422 killed = super(TestDriver, driver).check_time_quota()
423
424=== added file 'udd/threads.py'
425--- udd/threads.py 1970-01-01 00:00:00 +0000
426+++ udd/threads.py 2011-06-15 14:39:30 +0000
427@@ -0,0 +1,242 @@
428+import errno
429+import os
430+import Queue
431+import signal
432+import subprocess
433+import sys
434+import threading
435+import time
436+
437+try:
438+ from bzrlib import cethread
439+except ImportError:
440+ # Use our copy then
441+ from udd import cethread
442+
443+
444+class CatchingExceptionThread(cethread.CatchingExceptionThread):
445+ """Isolating features that needs to be backported to bzrlib.
446+
447+ None so far.
448+ """
449+
450+
451+class SubprocessMonitor(CatchingExceptionThread):
452+
453+ def __init__(self, args, max_duration=None):
454+ """Create a SubprocessMonitor.
455+
456+ :param args: A list of arguments to give to python.
457+
458+ :param max_duration: The max number of seconds the command is expected
459+ to be run (elapsed time).
460+ """
461+ self.started = threading.Event()
462+ self.stopped = threading.Event()
463+ super(SubprocessMonitor, self).__init__(target=self.spawn,
464+ sync_event=self.started)
465+ self.args = args
466+ self.proc_pid = None
467+ self.retcode = None
468+ self.out = self.err = None
469+ self.started_at = self.ended_at = None
470+ self.max_duration = max_duration
471+ self.killed_with = None
472+ self.killed_at = None
473+
474+ def spawn(self):
475+ """Spawn the python command in a subprocess."""
476+ proc = subprocess.Popen(['python'] + self.args,
477+ executable=sys.executable,
478+ stdout=subprocess.PIPE,
479+ stderr=subprocess.PIPE,
480+ stdin=subprocess.PIPE)
481+ self.proc_pid = proc.pid
482+ self.started_at = time.time()
483+ self.switch_and_set(self.stopped)
484+ try:
485+ self.out, self.err = proc.communicate()
486+ self.ended_at = time.time()
487+ self.retcode = proc.returncode
488+ except OSError, e:
489+ if e.errno in (errno.ECHILD, errno.ESRCH):
490+ # The process doesn't exist anymore
491+ pass
492+ else:
493+ raise
494+ self.stopped.set()
495+
496+ def collect(self, timeout=None):
497+ """Collect relevant data before discarding the thread.
498+
499+ This must be called by the controlling thread. Be aware that since this
500+ calls join() it may raise the pending exception which must be cleared
501+ if this behaviour is not the intended one.
502+
503+ Daughter classes can override this method to collect any relevant data
504+ (including the pending exception) before the thread is discarded.
505+ """
506+ self.join(timeout)
507+
508+ def kill_with_escalation(self, grace_period=None):
509+ """Attempt to kill the subprocess.
510+
511+ :param grace_period: A floating number of seconds to wait before using
512+ SIGKILL to kill the subprocess.
513+
514+ When first called, `signal.SIGTERM` is used to allow the subprocess to
515+ handle the kill cleanly. If called repeatedly, this will not try to
516+ kill the process again until the end of the specified
517+ `grace_period`. When the grace period expires, `signal.SIGKILL` is used
518+ so that subprocesses that don't shutdown when receiving
519+ `signal.SIGTERM` can still be shut down.
520+
521+ This must be called by the controlling thread.
522+ """
523+ killed = False
524+ if self.killed_with is None:
525+ killed = self._kill(signal.SIGTERM)
526+ else:
527+ # We already tried to kill this this subprocess, if it didn't
528+ # during the grace period, we'll kill it harder
529+ now = time.time()
530+ if grace_period is None:
531+ # FIXME: This should go into a config file -- vila 2011-03-18
532+ # 10 seconds should be more than enough for a process to die or
533+ # something wrong is happening in which case it's better to get
534+ # this problematic subprocess down as quickly as possible.
535+ grace_period = 10.0
536+ if now - self.killed_at > grace_period:
537+ killed = self._kill(signal.SIGKILL)
538+ return killed
539+
540+ def _kill(self, signum):
541+ killed = False
542+ # Don't attempt to kill the subprocess if it hasn't been started
543+ if self.proc_pid is not None:
544+ try:
545+ os.kill(self.proc_pid, signum)
546+ killed = True
547+ self.killed_with = signum
548+ self.killed_at = time.time()
549+ except OSError, e:
550+ if e.errno in (errno.ECHILD, errno.ESRCH):
551+ # The process doesn't exist anymore.
552+ pass
553+ else:
554+ raise
555+ return killed
556+
557+ def has_exceeded_time_quota(self, duration=None):
558+ """Is the thread running for too long ?
559+
560+ :param duration: If specified will override the ``max_duration``
561+ attribute.
562+ """
563+ if duration is None:
564+ duration = self.max_duration
565+ if (duration is not None
566+ and self.started_at is not None and self.ended_at is None):
567+ # A timeout is defined, the thread has been started but is not
568+ # finished yet
569+ if (time.time() - self.started_at) > self.max_duration:
570+ return True
571+ return False
572+
573+
574+class ThreadDriver(CatchingExceptionThread):
575+
576+ def __init__(self, queue, max_threads, target=None):
577+ """Create a ThreadDriver.
578+
579+ :param queue: A thread provider.
580+
581+ :param max_threads: Maximum number of threads allowed to run
582+ concurrently.
583+
584+ :param target: What callable should be run in the thread.
585+ """
586+ self.started = threading.Event()
587+ self.stopped = threading.Event()
588+ self.stop = threading.Event()
589+ if target is None:
590+ target = self.drive
591+ super(ThreadDriver, self).__init__(target=target,
592+ sync_event=self.started)
593+ self.queue = queue
594+ self.queue_closed = threading.Event()
595+ self.max_threads = max_threads
596+ self.threads = []
597+
598+ def drive(self):
599+ self.before_start()
600+ # From now on the caller will let us run until we stop or die
601+ self.switch_and_set(self.stopped)
602+ while True:
603+ # Calling wait ensures we can be interrupted properly, but we don't
604+ # want to block so we use a timeout. We don't use 0 though or other
605+ # threads may not get any cycle otherwise.
606+ self.stop.wait(0.001)
607+ if self.stop.isSet():
608+ break
609+ # Check for threads exceeding their expected lifetime
610+ self.check_time_quota()
611+ self.do_one_step()
612+ self.before_stop()
613+ self.stopped.set()
614+
615+ def before_start(self):
616+ pass
617+
618+ def check_time_quota(self):
619+ """Ensure threads don't exceed their elapsed time quota.
620+
621+ :returns: The list of killed threads.
622+ """
623+ killed = []
624+ for t in self.threads:
625+ if t.has_exceeded_time_quota():
626+ # Die you sucker
627+ t.kill_with_escalation()
628+ killed.append(t)
629+ return killed
630+
631+ def do_one_step(self):
632+ self.collect_terminated_threads()
633+ if (not self.stop.isSet() and not self.queue_closed.isSet()
634+ and len(self.threads) < self.max_threads):
635+ self.start_new_thread()
636+ if self.queue_closed.isSet() and not self.threads:
637+ # The queue is closed and there are no more running threads
638+ self.stop.set()
639+
640+ def before_stop(self):
641+ self.kill_remaining_threads()
642+
643+ def start_new_thread(self):
644+ # Get a new thread from the queue and start it
645+ try:
646+ new_thread = self.queue.get_nowait()
647+ except Queue.Empty:
648+ return None
649+ self.threads.append(new_thread)
650+ new_thread.start()
651+ new_thread.started.wait()
652+ # Tell the queue we're done with this thread
653+ self.queue.task_done()
654+ return new_thread
655+
656+ def collect_terminated_threads(self):
657+ # If there are terminated threads, collect them
658+ for t in self.threads[:]:
659+ if not t.isAlive():
660+ t.collect()
661+ self.threads.remove(t)
662+
663+ def kill_remaining_threads(self):
664+ # Kill all the remaining threads
665+ for t in self.threads[:]:
666+ t.kill_with_escalation()
667+ # Collect the thread whether we could kill it or not
668+ t.collect()
669+ self.threads.remove(t)

Subscribers

People subscribed via source and target branches