Merge ~cjwatson/launchpad:parallel-feed-swift into launchpad:master

Proposed by Colin Watson
Status: Merged
Approved by: Colin Watson
Approved revision: b4d7de1ff7ec31825384a7dc4cd572a50afbec1c
Merge reported by: Otto Co-Pilot
Merged at revision: not available
Proposed branch: ~cjwatson/launchpad:parallel-feed-swift
Merge into: launchpad:master
Diff against target: 168 lines (+90/-4)
3 files modified
cronscripts/librarian-feed-swift.py (+35/-3)
lib/lp/services/librarianserver/swift.py (+14/-1)
lib/lp/services/librarianserver/tests/test_swift.py (+41/-0)
Reviewer Review Type Date Requested Status
Ioana Lasc (community) Approve
Review via email: mp+401127@code.launchpad.net

Commit message

Support running parallel instances of librarian-feed-swift

Description of the change

This makes it easier to keep up under higher load.

To post a comment you must log in.
Revision history for this message
Ioana Lasc (ilasc) wrote :

Looks good

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/cronscripts/librarian-feed-swift.py b/cronscripts/librarian-feed-swift.py
2index de244c2..9da2c45 100755
3--- a/cronscripts/librarian-feed-swift.py
4+++ b/cronscripts/librarian-feed-swift.py
5@@ -47,6 +47,24 @@ class LibrarianFeedSwift(LaunchpadCronScript):
6 default=None, metavar="INTERVAL",
7 help="Don't migrate files older than INTERVAL "
8 "(PostgreSQL syntax)")
9+ self.parser.add_option(
10+ "--instance-id", action="store", type=int, default=None,
11+ metavar="INSTANCE_ID",
12+ help=(
13+ "Run as instance INSTANCE_ID (starting at 0) out of "
14+ "NUM_INSTANCES parallel workers"))
15+ self.parser.add_option(
16+ "--num-instances", action="store", type=int, default=None,
17+ metavar="NUM_INSTANCES",
18+ help="Run NUM_INSTANCES parallel workers")
19+
20+ @property
21+ def lockfilename(self):
22+ if self.options.instance_id is not None:
23+ return "launchpad-%s-%d.lock" % (
24+ self.name, self.options.instance_id)
25+ else:
26+ return "launchpad-%s.lock" % self.name
27
28 def main(self):
29 if self.options.rename and self.options.remove:
30@@ -72,17 +90,31 @@ class LibrarianFeedSwift(LaunchpadCronScript):
31 - CAST(%s AS INTERVAL)
32 """, (six.text_type(self.options.end_at),)).get_one()[0]
33
34+ if ((self.options.instance_id is None) !=
35+ (self.options.num_instances is None)):
36+ self.parser.error(
37+ "Must specify both or neither of --instance-id and "
38+ "--num-instances")
39+
40+ kwargs = {
41+ "instance_id": self.options.instance_id,
42+ "num_instances": self.options.num_instances,
43+ "remove_func": remove,
44+ }
45+
46 if self.options.ids and (self.options.start or self.options.end):
47 self.parser.error(
48 "Cannot specify both individual file(s) and range")
49
50 elif self.options.ids:
51 for lfc in self.options.ids:
52- swift.to_swift(self.logger, lfc, lfc, remove)
53+ swift.to_swift(
54+ self.logger, start_lfc_id=lfc, end_lfc_id=lfc, **kwargs)
55
56 else:
57- swift.to_swift(self.logger, self.options.start,
58- self.options.end, remove)
59+ swift.to_swift(
60+ self.logger, start_lfc_id=self.options.start,
61+ end_lfc_id=self.options.end, **kwargs)
62 self.logger.info('Done')
63
64
65diff --git a/lib/lp/services/librarianserver/swift.py b/lib/lp/services/librarianserver/swift.py
66index b45293f..23344c0 100644
67--- a/lib/lp/services/librarianserver/swift.py
68+++ b/lib/lp/services/librarianserver/swift.py
69@@ -56,12 +56,17 @@ def quiet_swiftclient(func, *args, **kwargs):
70 swiftclient.logger.disabled = old_disabled
71
72
73-def to_swift(log, start_lfc_id=None, end_lfc_id=None, remove_func=False):
74+def to_swift(log, start_lfc_id=None, end_lfc_id=None,
75+ instance_id=None, num_instances=None, remove_func=False):
76 '''Copy a range of Librarian files from disk into Swift.
77
78 start and end identify the range of LibraryFileContent.id to
79 migrate (inclusive).
80
81+ If instance_id and num_instances are set, only process files whose ID
82+ have remainder instance_id when divided by num_instances. This allows
83+ running multiple feeders in parallel.
84+
85 If remove_func is set, it is called for every file after being copied into
86 Swift.
87 '''
88@@ -76,6 +81,9 @@ def to_swift(log, start_lfc_id=None, end_lfc_id=None, remove_func=False):
89
90 log.info("Walking disk store {0} from {1} to {2}, inclusive".format(
91 fs_root, start_lfc_id, end_lfc_id))
92+ if instance_id is not None and num_instances is not None:
93+ log.info("Parallel mode: instance ID {0} of {1}".format(
94+ instance_id, num_instances))
95
96 start_fs_path = filesystem_path(start_lfc_id)
97 end_fs_path = filesystem_path(end_lfc_id)
98@@ -134,6 +142,9 @@ def to_swift(log, start_lfc_id=None, end_lfc_id=None, remove_func=False):
99 except ValueError:
100 log.warning('Invalid hex fail, skipping {0}'.format(fs_path))
101 continue
102+ if instance_id is not None and num_instances is not None:
103+ if (lfc % num_instances) != instance_id:
104+ continue
105
106 log.debug('Found {0} ({1})'.format(lfc, filename))
107
108@@ -175,6 +186,8 @@ def to_swift(log, start_lfc_id=None, end_lfc_id=None, remove_func=False):
109 if remove_func:
110 remove_func(fs_path)
111
112+ connection_pool.put(swift_connection)
113+
114
115 def rename(path):
116 # It would be nice to move the file out of the tree entirely, but we
117diff --git a/lib/lp/services/librarianserver/tests/test_swift.py b/lib/lp/services/librarianserver/tests/test_swift.py
118index 98ebc86..b891b0d 100644
119--- a/lib/lp/services/librarianserver/tests/test_swift.py
120+++ b/lib/lp/services/librarianserver/tests/test_swift.py
121@@ -289,6 +289,47 @@ class TestFeedSwift(TestCase):
122 # Our object round tripped
123 self.assertEqual(obj1 + obj2 + obj3, expected_content)
124
125+ def test_multiple_instances(self):
126+ log = BufferLogger()
127+
128+ # Confirm that files exist on disk where we expect to find them.
129+ for lfc in self.lfcs:
130+ path = swift.filesystem_path(lfc.id)
131+ self.assertTrue(os.path.exists(path))
132+
133+ # Migrate all the files into Swift, using multiple instances. For
134+ # each instance, only the matching files are processed.
135+ for instance_id in range(3):
136+ swift.to_swift(
137+ log, instance_id=instance_id, num_instances=3,
138+ remove_func=os.unlink)
139+
140+ # Only the matching files are gone from disk. (This is
141+ # cumulative, so we test for all instances up to this point.)
142+ for lfc in self.lfcs:
143+ exists = os.path.exists(swift.filesystem_path(lfc.id))
144+ if (lfc.id % 3) <= instance_id:
145+ self.assertFalse(exists)
146+ else:
147+ self.assertTrue(exists)
148+
149+ # Confirm all the files that have been migrated so far are in
150+ # Swift.
151+ swift_client = self.swift_fixture.connect()
152+ try:
153+ for lfc, contents in zip(self.lfcs, self.contents):
154+ container, name = swift.swift_location(lfc.id)
155+ if (lfc.id % 3) <= instance_id:
156+ headers, obj = swift_client.get_object(container, name)
157+ self.assertEqual(contents, obj, 'Did not round trip')
158+ else:
159+ self.assertRaises(
160+ swiftclient.ClientException,
161+ swift.quiet_swiftclient,
162+ swift_client.get_object, container, name)
163+ finally:
164+ swift_client.close()
165+
166
167 class TestHashStream(TestCase):
168 layer = BaseLayer

Subscribers

People subscribed via source and target branches

to status/vote changes: