Merge lp:~cjwatson/txpkgupload/drop-distro-files into lp:~lazr-developers/txpkgupload/trunk

Proposed by Colin Watson
Status: Merged
Merged at revision: 19
Proposed branch: lp:~cjwatson/txpkgupload/drop-distro-files
Merge into: lp:~lazr-developers/txpkgupload/trunk
Diff against target: 826 lines (+95/-462)
8 files modified
etc/txpkgupload.yaml (+5/-1)
src/txpkgupload/glock.py (+0/-316)
src/txpkgupload/hooks.py (+23/-106)
src/txpkgupload/plugin.py (+22/-6)
src/txpkgupload/tests/test_plugin.py (+23/-12)
src/txpkgupload/tests/test_twistedsftp.py (+8/-6)
src/txpkgupload/twistedftp.py (+11/-11)
src/txpkgupload/twistedsftp.py (+3/-4)
To merge this branch: bzr merge lp:~cjwatson/txpkgupload/drop-distro-files
Reviewer Review Type Date Requested Status
William Grant code Approve
Review via email: mp+247143@code.launchpad.net

Commit message

Institute a temporary directory on the same filesystem as the upload root; stop creating obsolete .distro files; remove locking, since we now move directories into place atomically.

Description of the change

Institute a temporary directory on the same filesystem as the upload root; stop creating obsolete .distro files; remove locking, since we now move directories into place atomically.

Before deploying this, we need to make sure that the tmp-incoming directory is created with appropriate permissions, and it wouldn't hurt to land an lp-production-configs change making the new directory explicit.

To post a comment you must log in.
Revision history for this message
William Grant (wgrant) :
review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'etc/txpkgupload.yaml'
--- etc/txpkgupload.yaml 2015-01-12 13:14:55 +0000
+++ etc/txpkgupload.yaml 2015-01-21 13:10:45 +0000
@@ -41,7 +41,11 @@
41# idle_timeout: 360041# idle_timeout: 3600
4242
43## Where on the filesystem do uploads live?43## Where on the filesystem do uploads live?
44# fsroot: "/var/tmp/txpkgupload"44# fsroot: "/var/tmp/txpkgupload/incoming"
45
46## Where do we write temporary files during uploads? This must be on the
47## same filesystem as fsroot.
48# temp_dir: "/var/tmp/txpkgupload/tmp-incoming"
4549
46# If true, enable additional debug logging.50# If true, enable additional debug logging.
47# debug: false51# debug: false
4852
=== removed file 'src/txpkgupload/glock.py'
--- src/txpkgupload/glock.py 2015-01-07 14:20:13 +0000
+++ src/txpkgupload/glock.py 1970-01-01 00:00:00 +0000
@@ -1,316 +0,0 @@
1#!/usr/bin/env python
2# -*- coding: latin1 -*-
3#----------------------------------------------------------------------------
4# glock.py: Global mutex
5#
6# See __doc__ string below.
7#
8# Requires:
9# - Python 1.5.2 or newer (www.python.org)
10# - On windows: win32 extensions installed
11# (http://www.python.org/windows/win32all/win32all.exe)
12# - OS: Unix, Windows.
13#
14# $Id: //depot/rgutils/rgutils/glock.py#5 $
15#----------------------------------------------------------------------------
16'''
17This module defines the class GlobalLock that implements a global
18(inter-process) mutex on Windows and Unix, using file-locking on
19Unix.
20
21@see: class L{GlobalLock} for more details.
22'''
23__version__ = '0.2.' + '$Revision: #5 $'[12:-2]
24__author__ = 'Richard Gruet', 'rjgruet@yahoo.com'
25__date__ = '$Date: 2005/06/19 $'[7:-2], '$Author: rgruet $'[9:-2]
26__since__ = '2000-01-22'
27__doc__ += '\n@author: %s (U{%s})\n@version: %s' % (__author__[0],
28 __author__[1], __version__)
29__all__ = ['GlobalLock', 'GlobalLockError', 'LockAlreadyAcquired', 'NotOwner']
30
31# Imports:
32import sys, string, os, errno, re
33
34# System-dependent imports for locking implementation:
35_windows = (sys.platform == 'win32')
36
37if _windows:
38 try:
39 import win32event, win32api, pywintypes
40 except ImportError:
41 sys.stderr.write('The win32 extensions need to be installed!')
42 try:
43 import ctypes
44 except ImportError:
45 ctypes = None
46else: # assume Unix
47 try:
48 import fcntl
49 except ImportError:
50 sys.stderr.write("On what kind of OS am I ? (Mac?) I should be on "
51 "Unix but can't import fcntl.\n")
52 raise
53 import threading
54
55# Exceptions :
56# ----------
57class GlobalLockError(Exception):
58 ''' Error raised by the glock module.
59 '''
60 pass
61
62class NotOwner(GlobalLockError):
63 ''' Attempt to release somebody else's lock.
64 '''
65 pass
66
67class LockAlreadyAcquired(GlobalLockError):
68 ''' Non-blocking acquire but lock already seized.
69 '''
70 pass
71
72
73# Constants
74# ---------:
75if sys.version[:3] < '2.2':
76 True, False = 1, 0 # built-in in Python 2.2+
77
78#----------------------------------------------------------------------------
79class GlobalLock:
80#----------------------------------------------------------------------------
81 ''' A global mutex.
82
83 B{Specification}
84
85 - The lock must act as a global mutex, ie block between different
86 candidate processus, but ALSO between different candidate
87 threads of the same process.
88
89 - It must NOT block in case of reentrant lock request issued by
90 the SAME thread.
91 - Extraneous unlocks should be ideally harmless.
92
93 B{Implementation}
94
95 In Python there is no portable global lock AFAIK. There is only a
96 LOCAL/ in-process Lock mechanism (threading.RLock), so we have to
97 implement our own solution:
98
99 - Unix: use fcntl.flock(). Recursive calls OK. Different process OK.
100 But <> threads, same process don't block so we have to use an extra
101 threading.RLock to fix that point.
102 - Windows: We use WIN32 mutex from Python Win32 extensions. Can't use
103 std module msvcrt.locking(), because global lock is OK, but
104 blocks also for 2 calls from the same thread!
105 '''
106 RE_ERROR_MSG = re.compile ("^\[Errno ([0-9]+)\]")
107
108 def __init__(self, fpath, lockInitially=False, logger=None):
109 ''' Creates (or opens) a global lock.
110
111 @param fpath: Path of the file used as lock target. This is also
112 the global id of the lock. The file will be created
113 if non existent.
114 @param lockInitially: if True locks initially.
115 @param logger: an optional logger object.
116 '''
117 self.logger = logger
118 self.fpath = fpath
119 if os.path.exists(fpath):
120 self.previous_lockfile_present = True
121 else:
122 self.previous_lockfile_present = False
123 if _windows:
124 self.name = string.replace(fpath, '\\', '_')
125 self.mutex = win32event.CreateMutex(None, lockInitially, self.name)
126 else: # Unix
127 self.name = fpath
128 self.flock = open(fpath, 'w')
129 self.fdlock = self.flock.fileno()
130 self.threadLock = threading.RLock()
131 if lockInitially:
132 self.acquire()
133
134 def __del__(self):
135 #print '__del__ called' ##
136 try: self.release()
137 except: pass
138 if _windows:
139 win32api.CloseHandle(self.mutex)
140 else:
141 try: self.flock.close()
142 except: pass
143
144 def __repr__(self):
145 return '<Global lock @ %s>' % self.name
146
147 def acquire(self, blocking=False):
148 """ Locks. Attemps to acquire a lock.
149
150 @param blocking: If True, suspends caller until done. Otherwise,
151 LockAlreadyAcquired is raised if the lock cannot be acquired immediately.
152
153 On windows an IOError is always raised after ~10 sec if the lock
154 can't be acquired.
155 @exception GlobalLockError: if lock can't be acquired (timeout)
156 @exception LockAlreadyAcquired: someone already has the lock and
157 the caller decided not to block.
158 """
159 if self.logger:
160 self.logger.info('Creating lockfile: %s', self.fpath)
161 if _windows:
162 if blocking:
163 timeout = win32event.INFINITE
164 else:
165 timeout = 0
166 r = win32event.WaitForSingleObject(self.mutex, timeout)
167 if r == win32event.WAIT_FAILED:
168 raise GlobalLockError("Can't acquire mutex: error")
169 if not blocking and r == win32event.WAIT_TIMEOUT:
170 raise LockAlreadyAcquired('Lock %s already acquired by '
171 'someone else' % self.name)
172 else:
173 # First, acquire the global (inter-process) lock:
174 if blocking:
175 options = fcntl.LOCK_EX
176 else:
177 options = fcntl.LOCK_EX|fcntl.LOCK_NB
178 try:
179 fcntl.flock(self.fdlock, options)
180 except IOError as message: #(errno 13: perm. denied,
181 # 36: Resource deadlock avoided)
182 if not blocking and self._errnoOf (message) == errno.EWOULDBLOCK:
183 raise LockAlreadyAcquired('Lock %s already acquired by '
184 'someone else' % self.name)
185 else:
186 raise GlobalLockError('Cannot acquire lock on "file" '
187 '%s: %s\n' % (self.name, message))
188 #print 'got file lock.' ##
189
190 # Then acquire the local (inter-thread) lock:
191 if not self.threadLock.acquire(blocking):
192 fcntl.flock(self.fdlock, fcntl.LOCK_UN) # release global lock
193 raise LockAlreadyAcquired('Lock %s already acquired by '
194 'someone else' % self.name)
195 if self.previous_lockfile_present and self.logger:
196 self.logger.warn("Stale lockfile detected and claimed.")
197 #print 'got thread lock.' ##
198
199 self.is_locked = True
200
201 def release(self, skip_delete=False):
202 ''' Unlocks. (caller must own the lock!)
203
204 @param skip_delete: don't try to delete the file. This can
205 be used when the original filename has changed; for
206 instance, if the lockfile is erased out-of-band, or if
207 the directory it contains has been renamed.
208
209 @return: The lock count.
210 @exception IOError: if file lock can't be released
211 @exception NotOwner: Attempt to release somebody else's lock.
212 '''
213 if not self.is_locked:
214 return
215 if not skip_delete:
216 if self.logger:
217 self.logger.debug('Removing lock file: %s', self.fpath)
218 os.unlink(self.fpath)
219 elif self.logger:
220 # At certain times the lockfile will have been removed or
221 # moved away before we call release(); log a message because
222 # this is unusual and could be an error.
223 self.logger.debug('Oops, my lock file disappeared: %s', self.fpath)
224 if _windows:
225 if ctypes:
226 result = ctypes.windll.kernel32.ReleaseMutex(self.mutex.handle)
227 if not result:
228 raise NotOwner("Attempt to release somebody else's lock")
229 else:
230 try:
231 win32event.ReleaseMutex(self.mutex)
232 #print "released mutex"
233 except pywintypes.error as e:
234 errCode, fctName, errMsg = e.args
235 if errCode == 288:
236 raise NotOwner("Attempt to release somebody else's lock")
237 else:
238 raise GlobalLockError('%s: err#%d: %s' % (fctName, errCode,
239 errMsg))
240 else:
241 # First release the local (inter-thread) lock:
242 try:
243 self.threadLock.release()
244 except AssertionError:
245 raise NotOwner("Attempt to release somebody else's lock")
246
247 # Then release the global (inter-process) lock:
248 try:
249 fcntl.flock(self.fdlock, fcntl.LOCK_UN)
250 except IOError: # (errno 13: permission denied)
251 raise GlobalLockError('Unlock of file "%s" failed\n' %
252 self.name)
253 self.is_locked = False
254
255 def _errnoOf (self, message):
256 match = self.RE_ERROR_MSG.search(str(message))
257 if match:
258 return int(match.group(1))
259 else:
260 raise Exception ('Malformed error message "%s"' % message)
261
262#----------------------------------------------------------------------------
263def test():
264#----------------------------------------------------------------------------
265 ##TODO: a more serious test with distinct processes !
266
267 print 'Testing glock.py...'
268
269 # unfortunately can't test inter-process lock here!
270 lockName = 'myFirstLock'
271 l = GlobalLock(lockName)
272 if not _windows:
273 assert os.path.exists(lockName)
274 l.acquire()
275 l.acquire() # reentrant lock, must not block
276 l.release()
277 l.release()
278
279 try: l.release()
280 except NotOwner: pass
281 else: raise Exception('should have raised a NotOwner exception')
282
283 # Check that <> threads of same process do block:
284 import threading, time
285 thread = threading.Thread(target=threadMain, args=(l,))
286 print 'main: locking...',
287 l.acquire()
288 print ' done.'
289 thread.start()
290 time.sleep(3)
291 print '\nmain: unlocking...',
292 l.release()
293 print ' done.'
294 time.sleep(0.1)
295
296 print '=> Test of glock.py passed.'
297 return l
298
299def threadMain(lock):
300 print 'thread started(%s).' % lock
301 try: lock.acquire(blocking=False)
302 except LockAlreadyAcquired: pass
303 else: raise Exception('should have raised LockAlreadyAcquired')
304 print 'thread: locking (should stay blocked for ~ 3 sec)...',
305 lock.acquire()
306 print 'thread: locking done.'
307 print 'thread: unlocking...',
308 lock.release()
309 print ' done.'
310 print 'thread ended.'
311
312#----------------------------------------------------------------------------
313# M A I N
314#----------------------------------------------------------------------------
315if __name__ == "__main__":
316 l = test()
3170
=== modified file 'src/txpkgupload/hooks.py'
--- src/txpkgupload/hooks.py 2015-01-09 15:55:08 +0000
+++ src/txpkgupload/hooks.py 2015-01-21 13:10:45 +0000
@@ -10,14 +10,10 @@
1010
1111
12import os12import os
13import shutil
14import stat
15import time13import time
1614
17from twisted.python import log15from twisted.python import log
1816
19from txpkgupload.glock import GlobalLock
20
2117
22class InterfaceFailure(Exception):18class InterfaceFailure(Exception):
23 pass19 pass
@@ -29,11 +25,8 @@
29 LOG_MAGIC = "Post-processing finished"25 LOG_MAGIC = "Post-processing finished"
30 _targetcount = 026 _targetcount = 0
3127
32 def __init__(self, targetpath, allow_user, cmd=None,28 def __init__(self, targetpath, targetstart=0, perms=None, prefix=''):
33 targetstart=0, perms=None, prefix=''):
34 self.targetpath = targetpath29 self.targetpath = targetpath
35 self.cmd = cmd
36 self.allow_user = allow_user
37 self.perms = perms30 self.perms = perms
38 self.prefix = prefix31 self.prefix = prefix
3932
@@ -53,9 +46,7 @@
53 log.msg("Session from %s:%s" % (host, port), debug=True)46 log.msg("Session from %s:%s" % (host, port), debug=True)
5447
55 def client_done_hook(self, fsroot, host, port):48 def client_done_hook(self, fsroot, host, port):
56 """A client has completed. If it authenticated then it stands a chance49 """A client has completed."""
57 of having uploaded a file to the set. If not; then it is simply an
58 aborted transaction and we remove the fsroot."""
5950
60 if fsroot not in self.clients:51 if fsroot not in self.clients:
61 raise InterfaceFailure("Unable to find fsroot in client set")52 raise InterfaceFailure("Unable to find fsroot in client set")
@@ -63,103 +54,29 @@
63 log.msg("Processing session complete in %s" % fsroot, debug=True)54 log.msg("Processing session complete in %s" % fsroot, debug=True)
6455
65 client = self.clients[fsroot]56 client = self.clients[fsroot]
66 if "distro" not in client:57
67 # Login username defines the distribution context of the upload.58 timestamp = time.strftime("%Y%m%d-%H%M%S")
68 # So abort unauthenticated sessions by removing its contents59 path = "upload%s-%s-%06d" % (
69 shutil.rmtree(fsroot)60 self.prefix, timestamp, self.targetcount)
70 return61 target_fsroot = os.path.join(self.targetpath, path)
7162
72 # Protect from race condition between creating the directory63 # Move the session directory to the target directory.
73 # and creating the distro file, and also in cases where the64 if os.path.exists(target_fsroot):
74 # temporary directory and the upload directory are not in the65 log.msg("Targeted upload already present: %s" % path)
75 # same filesystem (non-atomic "rename").66 log.msg("System clock skewed?")
76 lockfile_path = os.path.join(self.targetpath, ".lock")67 else:
77 self.lock = GlobalLock(lockfile_path)68 try:
7869 os.rename(fsroot, target_fsroot)
79 # XXX cprov 20071024 bug=156795: We try to acquire the lock as soon70 except (OSError, IOError):
80 # as possible after creating the lockfile but are still open to71 if not os.path.exists(target_fsroot):
81 # a race.72 raise
82 self.lock.acquire(blocking=True)73
83 mode = stat.S_IMODE(os.stat(lockfile_path).st_mode)74 # XXX cprov 20071024: We should replace os.system call by os.chmod
8475 # and fix the default permission value accordingly in txpkgupload
85 # XXX cprov 20081024 bug=185731: The lockfile permission can only be76 if self.perms is not None:
86 # changed by its owner. Since we can't predict which process will77 os.system("chmod %s -R %s" % (self.perms, target_fsroot))
87 # create it in production systems we simply ignore errors when trying
88 # to grant the right permission. At least, one of the process will
89 # be able to do so.
90 try:
91 os.chmod(lockfile_path, mode | stat.S_IWGRP)
92 except OSError:
93 pass
94
95 try:
96 timestamp = time.strftime("%Y%m%d-%H%M%S")
97 path = "upload%s-%s-%06d" % (
98 self.prefix, timestamp, self.targetcount)
99 target_fsroot = os.path.join(self.targetpath, path)
100
101 # Create file to store the distro used.
102 log.msg(
103 "Upload was targetted at %s" % client["distro"], debug=True)
104 distro_filename = target_fsroot + ".distro"
105 distro_file = open(distro_filename, "w")
106 distro_file.write(client["distro"])
107 distro_file.close()
108
109 # Move the session directory to the target directory.
110 if os.path.exists(target_fsroot):
111 log.msg("Targeted upload already present: %s" % path)
112 log.msg("System clock skewed ?")
113 else:
114 try:
115 shutil.move(fsroot, target_fsroot)
116 except (OSError, IOError):
117 if not os.path.exists(target_fsroot):
118 raise
119
120 # XXX cprov 20071024: We should replace os.system call by os.chmod
121 # and fix the default permission value accordingly in txpkgupload
122 if self.perms is not None:
123 os.system("chmod %s -R %s" % (self.perms, target_fsroot))
124
125 # Invoke processing script, if provided.
126 if self.cmd:
127 cmd = self.cmd
128 cmd = cmd.replace("@fsroot@", target_fsroot)
129 cmd = cmd.replace("@distro@", client["distro"])
130 log.msg("Running upload handler: %s" % cmd, debug=True)
131 os.system(cmd)
132 finally:
133 # We never delete the lockfile, this way the inode will be
134 # constant while the machine is up. See comment on 'acquire'
135 self.lock.release(skip_delete=True)
13678
137 self.clients.pop(fsroot)79 self.clients.pop(fsroot)
138 # This is mainly done so that tests know when the80 # This is mainly done so that tests know when the
139 # post-processing hook has finished.81 # post-processing hook has finished.
140 log.msg(self.LOG_MAGIC)82 log.msg(self.LOG_MAGIC)
141
142 def auth_verify_hook(self, fsroot, user, password):
143 """Verify that the username matches a distribution we care about.
144
145 The password is irrelevant to auth, as is the fsroot"""
146 if fsroot not in self.clients:
147 raise InterfaceFailure("Unable to find fsroot in client set")
148
149 # local authentication
150 self.clients[fsroot]["distro"] = self.allow_user
151 return True
152
153 # When we get on with the txpkgupload path stuff, the below may be
154 # useful and is thus left in rather than being removed.
155
156 #try:
157 # d = Distribution.byName(user)
158 # if d:
159 # log.msg("Accepting login for %s" % user, debug=True)
160 # self.clients[fsroot]["distro"] = user
161 # return True
162 #except object as e:
163 # print e
164 #return False
165
16683
=== modified file 'src/txpkgupload/plugin.py'
--- src/txpkgupload/plugin.py 2015-01-12 18:50:14 +0000
+++ src/txpkgupload/plugin.py 2015-01-21 13:10:45 +0000
@@ -7,6 +7,7 @@
7 ]7 ]
88
9from functools import partial9from functools import partial
10import os
1011
11from formencode import Schema12from formencode import Schema
12from formencode.api import set_stdtranslation13from formencode.api import set_stdtranslation
@@ -119,6 +120,10 @@
119 # Where on the filesystem do uploads live?120 # Where on the filesystem do uploads live?
120 fsroot = String(if_missing=None)121 fsroot = String(if_missing=None)
121122
123 # Where do we write temporary files during uploads? This must be on the
124 # same filesystem as fsroot.
125 temp_dir = String(if_missing=None)
126
122 @classmethod127 @classmethod
123 def parse(cls, stream):128 def parse(cls, stream):
124 """Load a YAML configuration from `stream` and validate."""129 """Load a YAML configuration from `stream` and validate."""
@@ -143,19 +148,22 @@
143 """An SSH avatar specific to txpkgupload.148 """An SSH avatar specific to txpkgupload.
144149
145 :ivar fs_root: The file system root for this session.150 :ivar fs_root: The file system root for this session.
151 :ivar temp_dir: The temporary directory for this session.
146 """152 """
147153
148 def __init__(self, user_dict, fs_root):154 def __init__(self, user_dict, fs_root, temp_dir):
149 LaunchpadAvatar.__init__(self, user_dict)155 LaunchpadAvatar.__init__(self, user_dict)
150 self.fs_root = fs_root156 self.fs_root = fs_root
157 self.temp_dir = temp_dir
151158
152159
153class Realm:160class Realm:
154 implements(IRealm)161 implements(IRealm)
155162
156 def __init__(self, authentication_proxy, fs_root):163 def __init__(self, authentication_proxy, fs_root, temp_dir):
157 self.authentication_proxy = authentication_proxy164 self.authentication_proxy = authentication_proxy
158 self.fs_root = fs_root165 self.fs_root = fs_root
166 self.temp_dir = temp_dir
159167
160 def requestAvatar(self, avatar_id, mind, *interfaces):168 def requestAvatar(self, avatar_id, mind, *interfaces):
161 # Fetch the user's details from the authserver169 # Fetch the user's details from the authserver
@@ -164,20 +172,20 @@
164172
165 # Once all those details are retrieved, we can construct the avatar.173 # Once all those details are retrieved, we can construct the avatar.
166 def got_user_dict(user_dict):174 def got_user_dict(user_dict):
167 avatar = PkgUploadAvatar(user_dict, self.fs_root)175 avatar = PkgUploadAvatar(user_dict, self.fs_root, self.temp_dir)
168 return interfaces[0], avatar, avatar.logout176 return interfaces[0], avatar, avatar.logout
169177
170 return deferred.addCallback(got_user_dict)178 return deferred.addCallback(got_user_dict)
171179
172180
173def make_portal(authentication_endpoint, fs_root):181def make_portal(authentication_endpoint, fs_root, temp_dir):
174 """Create and return a `Portal` for the SSH service.182 """Create and return a `Portal` for the SSH service.
175183
176 This portal accepts SSH credentials and returns our customized SSH184 This portal accepts SSH credentials and returns our customized SSH
177 avatars (see `LaunchpadAvatar`).185 avatars (see `LaunchpadAvatar`).
178 """186 """
179 authentication_proxy = Proxy(authentication_endpoint)187 authentication_proxy = Proxy(authentication_endpoint)
180 portal = Portal(Realm(authentication_proxy, fs_root))188 portal = Portal(Realm(authentication_proxy, fs_root, temp_dir))
181 portal.registerChecker(189 portal.registerChecker(
182 PublicKeyFromLaunchpadChecker(authentication_proxy))190 PublicKeyFromLaunchpadChecker(authentication_proxy))
183 return portal191 return portal
@@ -212,18 +220,26 @@
212 oops_dir, oops_reporter, server_argv=server_argv)220 oops_dir, oops_reporter, server_argv=server_argv)
213221
214 root = get_txpkgupload_root(config["fsroot"])222 root = get_txpkgupload_root(config["fsroot"])
223 temp_dir = config["temp_dir"]
224 if temp_dir is None:
225 temp_dir = os.path.abspath(os.path.join(
226 root, os.pardir, "tmp-incoming"))
227 if not os.path.exists(temp_dir):
228 os.makedirs(temp_dir, 0775)
215229
216 ftp_config = config["ftp"]230 ftp_config = config["ftp"]
217 ftp_service = FTPServiceFactory.makeFTPService(231 ftp_service = FTPServiceFactory.makeFTPService(
218 port=ftp_config["port"],232 port=ftp_config["port"],
219 root=root,233 root=root,
234 temp_dir=temp_dir,
220 idle_timeout=config["idle_timeout"])235 idle_timeout=config["idle_timeout"])
221 ftp_service.name = "ftp"236 ftp_service.name = "ftp"
222 ftp_service.setServiceParent(services)237 ftp_service.setServiceParent(services)
223238
224 sftp_config = config["sftp"]239 sftp_config = config["sftp"]
225 sftp_service = SSHService(240 sftp_service = SSHService(
226 portal=make_portal(sftp_config["authentication_endpoint"], root),241 portal=make_portal(
242 sftp_config["authentication_endpoint"], root, temp_dir),
227 private_key_path=sftp_config["host_key_private"],243 private_key_path=sftp_config["host_key_private"],
228 public_key_path=sftp_config["host_key_public"],244 public_key_path=sftp_config["host_key_public"],
229 main_log='txpkgupload',245 main_log='txpkgupload',
230246
=== modified file 'src/txpkgupload/tests/test_plugin.py'
--- src/txpkgupload/tests/test_plugin.py 2015-01-12 13:14:55 +0000
+++ src/txpkgupload/tests/test_plugin.py 2015-01-21 13:10:45 +0000
@@ -115,6 +115,7 @@
115 "host_key_public": None,115 "host_key_public": None,
116 "port": "tcp:5022",116 "port": "tcp:5022",
117 },117 },
118 "temp_dir": None,
118 }119 }
119 observed = Config.to_python({})120 observed = Config.to_python({})
120 self.assertEqual(expected, observed)121 self.assertEqual(expected, observed)
@@ -262,6 +263,8 @@
262263
263 def __init__(self, root_dir):264 def __init__(self, root_dir):
264 self.root_dir = root_dir265 self.root_dir = root_dir
266 self.fsroot = os.path.join(self.root_dir, "incoming")
267 self.temp_dir = os.path.join(self.root_dir, "tmp-incoming")
265 top = os.path.join(268 top = os.path.join(
266 os.path.dirname(__file__), os.pardir, os.pardir, os.pardir)269 os.path.dirname(__file__), os.pardir, os.pardir, os.pardir)
267 with open(os.path.join(top, "etc", "txpkgupload.yaml")) as stream:270 with open(os.path.join(top, "etc", "txpkgupload.yaml")) as stream:
@@ -270,8 +273,11 @@
270273
271 def setUp(self):274 def setUp(self):
272 super(FTPServer, self).setUp()275 super(FTPServer, self).setUp()
273 self.pkgupload = self.useFixture(PkgUploadFixture(276 os.mkdir(self.fsroot)
274 "fsroot: %s" % self.root_dir))277 os.mkdir(self.temp_dir)
278 self.pkgupload = self.useFixture(PkgUploadFixture(dedent("""\
279 fsroot: %s
280 temp_dir: %s""") % (self.fsroot, self.temp_dir)))
275281
276 def getAnonClient(self):282 def getAnonClient(self):
277 creator = ClientCreator(283 creator = ClientCreator(
@@ -368,6 +374,8 @@
368374
369 def __init__(self, root_dir):375 def __init__(self, root_dir):
370 self.root_dir = root_dir376 self.root_dir = root_dir
377 self.fsroot = os.path.join(self.root_dir, "incoming")
378 self.temp_dir = os.path.join(self.root_dir, "tmp-incoming")
371 #self._factory = factory379 #self._factory = factory
372 top = os.path.join(380 top = os.path.join(
373 os.path.dirname(__file__), os.pardir, os.pardir, os.pardir)381 os.path.dirname(__file__), os.pardir, os.pardir, os.pardir)
@@ -401,10 +409,14 @@
401 self.authserver_url = b"http://localhost:%d/" % self.authserver_port409 self.authserver_url = b"http://localhost:%d/" % self.authserver_port
402 self.addCleanup(self.authserver_listener.stopListening)410 self.addCleanup(self.authserver_listener.stopListening)
403 self.setUpUser('joe')411 self.setUpUser('joe')
412 os.mkdir(self.fsroot)
413 os.mkdir(self.temp_dir)
404 self.pkgupload = self.useFixture(PkgUploadFixture(dedent("""\414 self.pkgupload = self.useFixture(PkgUploadFixture(dedent("""\
405 sftp:415 sftp:
406 authentication_endpoint: %s416 authentication_endpoint: %s
407 fsroot: %s""") % (self.authserver_url, self.root_dir)))417 fsroot: %s
418 temp_dir: %s""") %
419 (self.authserver_url, self.fsroot, self.temp_dir)))
408420
409 @defer.inlineCallbacks421 @defer.inlineCallbacks
410 def getClient(self):422 def getClient(self):
@@ -460,8 +472,8 @@
460 def setUp(self):472 def setUp(self):
461 """Set up txpkgupload in a temp dir."""473 """Set up txpkgupload in a temp dir."""
462 super(TestPkgUploadServiceMakerMixin, self).setUp()474 super(TestPkgUploadServiceMakerMixin, self).setUp()
463 self.root_dir = self.useFixture(TempDir()).path475 root_dir = self.useFixture(TempDir()).path
464 self.server = self.server_factory(self.root_dir)476 self.server = self.server_factory(root_dir)
465 self.useFixture(self.server)477 self.useFixture(self.server)
466478
467 def test_init(self):479 def test_init(self):
@@ -484,9 +496,9 @@
484496
485 Only works for a single upload (txpkgupload transaction).497 Only works for a single upload (txpkgupload transaction).
486 """498 """
487 contents = sorted(os.listdir(self.root_dir))499 contents = sorted(os.listdir(self.server.fsroot))
488 upload_dir = contents[1]500 upload_dir = contents[0]
489 return os.path.join(self.root_dir, upload_dir, path)501 return os.path.join(self.server.fsroot, upload_dir, path)
490502
491 @defer.inlineCallbacks503 @defer.inlineCallbacks
492 def test_mkdir(self):504 def test_mkdir(self):
@@ -611,16 +623,15 @@
611 yield self.server.waitForClose(4)623 yield self.server.waitForClose(4)
612624
613 # Build a list of directories representing the 4 sessions.625 # Build a list of directories representing the 4 sessions.
614 upload_dirs = [leaf for leaf in sorted(os.listdir(self.root_dir))626 upload_dirs = [leaf for leaf in sorted(os.listdir(self.server.fsroot))
615 if not leaf.startswith(".") and627 if not leaf.startswith(".")]
616 not leaf.endswith(".distro")]
617 self.assertEqual(len(upload_dirs), 4)628 self.assertEqual(len(upload_dirs), 4)
618629
619 # Check the contents of files on each session.630 # Check the contents of files on each session.
620 expected_contents = ['ONE', 'TWO', 'THREE', 'FOUR']631 expected_contents = ['ONE', 'TWO', 'THREE', 'FOUR']
621 for index in range(4):632 for index in range(4):
622 content = open(os.path.join(633 content = open(os.path.join(
623 self.root_dir, upload_dirs[index], "test")).read()634 self.server.fsroot, upload_dirs[index], "test")).read()
624 self.assertEqual(content, expected_contents[index])635 self.assertEqual(content, expected_contents[index])
625636
626637
627638
=== modified file 'src/txpkgupload/tests/test_twistedsftp.py'
--- src/txpkgupload/tests/test_twistedsftp.py 2015-01-21 12:20:52 +0000
+++ src/txpkgupload/tests/test_twistedsftp.py 2015-01-21 13:10:45 +0000
@@ -16,18 +16,20 @@
1616
17class MockAvatar:17class MockAvatar:
1818
19 def __init__(self, fs_root):19 def __init__(self, fs_root, temp_dir):
20 self.fs_root = fs_root20 self.fs_root = fs_root
21 self.temp_dir = temp_dir
2122
2223
23class TestSFTPServer(testtools.TestCase):24class TestSFTPServer(testtools.TestCase):
2425
25 def setUp(self):26 def setUp(self):
26 tempdir = fixtures.TempDir()27 temp_dir = self.useFixture(fixtures.TempDir()).path
27 self.useFixture(tempdir)28 fs_root = os.path.join(temp_dir, "incoming")
28 self.useFixture(fixtures.MonkeyPatch("tempfile.tempdir", tempdir.path))29 temp_dir = os.path.join(temp_dir, "tmp-incoming")
29 self.fs_root = self.useFixture(fixtures.TempDir()).path30 os.mkdir(fs_root)
30 self.sftp_server = SFTPServer(MockAvatar(self.fs_root))31 os.mkdir(temp_dir)
32 self.sftp_server = SFTPServer(MockAvatar(fs_root, temp_dir))
31 super(TestSFTPServer, self).setUp()33 super(TestSFTPServer, self).setUp()
3234
33 def assertPermissions(self, expected, file_name):35 def assertPermissions(self, expected, file_name):
3436
=== modified file 'src/txpkgupload/twistedftp.py'
--- src/txpkgupload/twistedftp.py 2015-01-09 15:55:08 +0000
+++ src/txpkgupload/twistedftp.py 2015-01-21 13:10:45 +0000
@@ -52,15 +52,14 @@
52 Roughly equivalent to the SFTPServer in the sftp side of things.52 Roughly equivalent to the SFTPServer in the sftp side of things.
53 """53 """
5454
55 def __init__(self, fsroot):55 def __init__(self, fsroot, temp_dir):
56 self._fs_root = fsroot56 self._fs_root = fsroot
57 self.uploadfilesystem = UploadFileSystem(tempfile.mkdtemp())57 self.uploadfilesystem = UploadFileSystem(
58 tempfile.mkdtemp(dir=temp_dir))
58 self._current_upload = self.uploadfilesystem.rootpath59 self._current_upload = self.uploadfilesystem.rootpath
59 os.chmod(self._current_upload, 0770)60 os.chmod(self._current_upload, 0770)
60 self.hook = Hooks(61 self.hook = Hooks(self._fs_root, perms='g+rws', prefix='-ftp')
61 self._fs_root, "ubuntu", perms='g+rws', prefix='-ftp')
62 self.hook.new_client_hook(self._current_upload, 0, 0)62 self.hook.new_client_hook(self._current_upload, 0, 0)
63 self.hook.auth_verify_hook(self._current_upload, None, None)
64 super(AnonymousShell, self).__init__(63 super(AnonymousShell, self).__init__(
65 filepath.FilePath(self._current_upload))64 filepath.FilePath(self._current_upload))
6665
@@ -113,8 +112,9 @@
113 """FTP Realm that lets anyone in."""112 """FTP Realm that lets anyone in."""
114 implements(IRealm)113 implements(IRealm)
115114
116 def __init__(self, root):115 def __init__(self, root, temp_dir):
117 self.root = root116 self.root = root
117 self.temp_dir = temp_dir
118118
119 def requestAvatar(self, avatarId, mind, *interfaces):119 def requestAvatar(self, avatarId, mind, *interfaces):
120 """Return a txpkgupload avatar - that is, an "authorisation".120 """Return a txpkgupload avatar - that is, an "authorisation".
@@ -124,7 +124,7 @@
124 """124 """
125 for iface in interfaces:125 for iface in interfaces:
126 if iface is ftp.IFTPShell:126 if iface is ftp.IFTPShell:
127 avatar = AnonymousShell(self.root)127 avatar = AnonymousShell(self.root, self.temp_dir)
128 return ftp.IFTPShell, avatar, getattr(128 return ftp.IFTPShell, avatar, getattr(
129 avatar, 'logout', lambda: None)129 avatar, 'logout', lambda: None)
130 raise NotImplementedError(130 raise NotImplementedError(
@@ -134,8 +134,8 @@
134class FTPServiceFactory(service.Service):134class FTPServiceFactory(service.Service):
135 """A factory that makes an `FTPService`"""135 """A factory that makes an `FTPService`"""
136136
137 def __init__(self, port, root, idle_timeout):137 def __init__(self, port, root, temp_dir, idle_timeout):
138 realm = FTPRealm(root)138 realm = FTPRealm(root, temp_dir)
139 portal = Portal(realm)139 portal = Portal(realm)
140 portal.registerChecker(AccessCheck())140 portal.registerChecker(AccessCheck())
141 factory = ftp.FTPFactory(portal)141 factory = ftp.FTPFactory(portal)
@@ -149,7 +149,7 @@
149 self.portno = port149 self.portno = port
150150
151 @staticmethod151 @staticmethod
152 def makeFTPService(port, root, idle_timeout):152 def makeFTPService(port, root, temp_dir, idle_timeout):
153 strport = "tcp:%s" % port153 strport = "tcp:%s" % port
154 factory = FTPServiceFactory(port, root, idle_timeout)154 factory = FTPServiceFactory(port, root, temp_dir, idle_timeout)
155 return strports.service(strport, factory.ftpfactory)155 return strports.service(strport, factory.ftpfactory)
156156
=== modified file 'src/txpkgupload/twistedsftp.py'
--- src/txpkgupload/twistedsftp.py 2015-01-09 15:55:08 +0000
+++ src/txpkgupload/twistedsftp.py 2015-01-21 13:10:45 +0000
@@ -38,13 +38,12 @@
38 provideHandler(self.connectionClosed)38 provideHandler(self.connectionClosed)
39 self._avatar = avatar39 self._avatar = avatar
40 self._fs_root = avatar.fs_root40 self._fs_root = avatar.fs_root
41 self.uploadfilesystem = UploadFileSystem(tempfile.mkdtemp())41 self.uploadfilesystem = UploadFileSystem(
42 tempfile.mkdtemp(dir=avatar.temp_dir))
42 self._current_upload = self.uploadfilesystem.rootpath43 self._current_upload = self.uploadfilesystem.rootpath
43 os.chmod(self._current_upload, 0770)44 os.chmod(self._current_upload, 0770)
44 self.hook = Hooks(45 self.hook = Hooks(self._fs_root, perms='g+rws', prefix='-sftp')
45 self._fs_root, "ubuntu", perms='g+rws', prefix='-sftp')
46 self.hook.new_client_hook(self._current_upload, 0, 0)46 self.hook.new_client_hook(self._current_upload, 0, 0)
47 self.hook.auth_verify_hook(self._current_upload, None, None)
4847
49 def gotVersion(self, other_version, ext_data):48 def gotVersion(self, other_version, ext_data):
50 return {}49 return {}

Subscribers

People subscribed via source and target branches

to all changes: