Merge ~pappacena/turnip:parallel-rpc-call into turnip:master

Proposed by Thiago F. Pappacena
Status: Rejected
Rejected by: Thiago F. Pappacena
Proposed branch: ~pappacena/turnip:parallel-rpc-call
Merge into: turnip:master
Prerequisite: ~pappacena/turnip:paginated-check-refs-permissions
Diff against target: 264 lines (+110/-29)
5 files modified
requirements.txt (+1/-0)
setup.py (+1/-0)
turnip/pack/hooks/hook.py (+46/-16)
turnip/pack/tests/test_hooks.py (+21/-13)
turnip/tests/test_helpers.py (+41/-0)
Reviewer Review Type Date Requested Status
Launchpad code reviewers Pending
Review via email: mp+384684@code.launchpad.net

Commit message

Running branches permission check in parallel, in order to speedup git push operation for large sets of tags & branches.

To post a comment you must log in.

Unmerged commits

a177779... by Thiago F. Pappacena

Running permission checks in parallel on hooks

7f07110... by Thiago F. Pappacena

Fixing comment

545ac8a... by Thiago F. Pappacena

Merge branch 'paginated-check-refs-permissions' of git+ssh://git.launchpad.net/~pappacena/turnip into paginated-check-refs-permissions

cfa68f9... by Thiago F. Pappacena

Paginating check_ref_permissions RPC calls to avoid timeouts

44036eb... by Thiago F. Pappacena

Paginating check_ref_permissions RPC calls to avoid timeouts

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/requirements.txt b/requirements.txt
2index cb86dfe..4f71780 100644
3--- a/requirements.txt
4+++ b/requirements.txt
5@@ -11,6 +11,7 @@ cornice==3.6.1
6 cryptography==2.8
7 docutils==0.14
8 enum34==1.1.9
9+futures==3.3.0
10 envdir==0.7
11 extras==1.0.0
12 fixtures==3.0.0
13diff --git a/setup.py b/setup.py
14index b5821f7..bb22bae 100755
15--- a/setup.py
16+++ b/setup.py
17@@ -21,6 +21,7 @@ requires = [
18 'contextlib2',
19 'cornice',
20 'enum34; python_version < "3.4"',
21+ 'futures; python_version < "3.2"',
22 'lazr.sshserver>=0.1.7',
23 'Paste',
24 'pygit2>=0.27.4,<0.28.0',
25diff --git a/turnip/pack/hooks/hook.py b/turnip/pack/hooks/hook.py
26index b1348fa..e527275 100755
27--- a/turnip/pack/hooks/hook.py
28+++ b/turnip/pack/hooks/hook.py
29@@ -10,6 +10,7 @@ from __future__ import (
30 )
31
32 import base64
33+from concurrent.futures import ThreadPoolExecutor
34 import json
35 import os
36 import socket
37@@ -24,6 +25,20 @@ from turnip.compat.files import fd_buffer
38 # that currently causes CFFI warnings to be returned to the client.
39 GIT_OID_HEX_ZERO = '0'*40
40
41+# Be careful when adjusting these numbers. A page size too big may lead to
42+# staled connections and timeouts, while small page sizes could lead to poor
43+# performance.
44+# For the amount of workers, too many workers might beat too hard on Launchpad,
45+# and too few of them might lead to poor performance.
46+XML_RPC_PAGE_SIZE = 100
47+XML_RPC_MAX_WORKERS = 4
48+
49+
50+def get_socket():
51+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
52+ sock.connect(os.environ['TURNIP_HOOK_RPC_SOCK'])
53+ return sock
54+
55
56 def check_ancestor(old, new):
57 # This is a delete, setting the new ref.
58@@ -130,7 +145,8 @@ def netstring_recv(sock):
59 return bytes(s)
60
61
62-def rpc_invoke(sock, method, args):
63+def rpc_invoke(method, args):
64+ sock = get_socket()
65 msg = dict(args)
66 assert 'op' not in msg
67 msg['op'] = method
68@@ -147,17 +163,33 @@ def split_list(lst, n):
69 return [lst[i:i + n] for i in range(0, len(lst), n)]
70
71
72-def check_ref_permissions(sock, rpc_key, ref_paths, page_size=25):
73+def check_ref_permissions(rpc_key, ref_paths, page_size=XML_RPC_PAGE_SIZE,
74+ max_workers=XML_RPC_MAX_WORKERS):
75+ """Run XML RPC call to check refs permissions.
76+
77+ For large sets of ref_paths, this method paginates them to do several
78+ XML RPC calls of maximun `page_size` refs in each, and run it in
79+ parallel, `max_workers` at a time.
80+ """
81 ref_paths = [base64.b64encode(path).decode('UTF-8') for path in ref_paths]
82 permissions = {}
83- # Paginate the rpc calls to avoid timeouts.
84- for ref_paths_chunk in split_list(ref_paths, page_size):
85- rule_lines = rpc_invoke(
86- sock, 'check_ref_permissions',
87- {'key': rpc_key, 'paths': ref_paths_chunk})
88- permissions.update({
89- base64.b64decode(path.encode('UTF-8')): permissions
90- for path, permissions in rule_lines.items()})
91+
92+ # Paginate the rpc calls and run them in parallel, to avoid timeouts and
93+ # speed up a bit the request.
94+ chunks = split_list(ref_paths, page_size)
95+ with ThreadPoolExecutor(max_workers=max_workers) as executor:
96+ futures = []
97+ for ref_paths_chunk in chunks:
98+ future = executor.submit(
99+ rpc_invoke, 'check_ref_permissions',
100+ {'key': rpc_key, 'paths': ref_paths_chunk})
101+ futures.append(future)
102+
103+ for future in futures:
104+ rule_lines = future.result()
105+ permissions.update({
106+ base64.b64decode(path.encode('UTF-8')): permissions
107+ for path, permissions in rule_lines.items()})
108 return permissions
109
110
111@@ -175,7 +207,7 @@ def send_mp_url(received_line):
112 pushed_branch = ref[len(b'refs/heads/'):]
113 if not is_default_branch(pushed_branch):
114 mp_url = rpc_invoke(
115- sock, 'get_mp_url',
116+ 'get_mp_url',
117 {'key': rpc_key,
118 'branch': six.ensure_text(pushed_branch, "UTF-8")})
119 if mp_url is not None:
120@@ -193,14 +225,12 @@ if __name__ == '__main__':
121 stdin = fd_buffer(sys.stdin)
122 stdout = fd_buffer(sys.stdout)
123 rpc_key = os.environ['TURNIP_HOOK_RPC_KEY']
124- sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
125- sock.connect(os.environ['TURNIP_HOOK_RPC_SOCK'])
126 hook = os.path.basename(sys.argv[0])
127 if hook == 'pre-receive':
128 # Verify the proposed changes against rules from the server.
129 raw_paths = stdin.readlines()
130 ref_paths = [p.rstrip(b'\n').split(b' ', 2)[2] for p in raw_paths]
131- rule_lines = check_ref_permissions(sock, rpc_key, ref_paths)
132+ rule_lines = check_ref_permissions(rpc_key, ref_paths)
133 errors = match_rules(rule_lines, raw_paths)
134 for error in errors:
135 stdout.write(error + b'\n')
136@@ -210,13 +240,13 @@ if __name__ == '__main__':
137 # Details of the changes aren't currently included.
138 lines = stdin.readlines()
139 if lines:
140- rpc_invoke(sock, 'notify_push', {'key': rpc_key})
141+ rpc_invoke('notify_push', {'key': rpc_key})
142 if len(lines) == 1:
143 send_mp_url(lines[0])
144 sys.exit(0)
145 elif hook == 'update':
146 ref = sys.argv[1]
147- rule_lines = check_ref_permissions(sock, rpc_key, [ref])
148+ rule_lines = check_ref_permissions(rpc_key, [ref])
149 errors = match_update_rules(rule_lines, sys.argv[1:4])
150 for error in errors:
151 stdout.write(error + b'\n')
152diff --git a/turnip/pack/tests/test_hooks.py b/turnip/pack/tests/test_hooks.py
153index b383d6b..0bd2aaa 100644
154--- a/turnip/pack/tests/test_hooks.py
155+++ b/turnip/pack/tests/test_hooks.py
156@@ -17,6 +17,8 @@ from fixtures import (
157 MonkeyPatch,
158 TempDir,
159 )
160+
161+
162 try:
163 from unittest import mock
164 except ImportError:
165@@ -35,6 +37,7 @@ from turnip.pack import hookrpc
166 from turnip.pack.helpers import ensure_hooks
167 from turnip.pack.hooks import hook
168 from turnip.pack.hooks.hook import split_list, check_ref_permissions
169+from turnip.tests.test_helpers import MockThreadPoolExecutor
170
171
172 class HookProcessProtocol(protocol.ProcessProtocol):
173@@ -472,10 +475,19 @@ class TestSplitRefPathsCalls(TestCase):
174 {encode(b'master'): [], encode(b'develop'): []},
175 {encode(b'head'): []}
176 ]
177- sock = mock.Mock()
178- # Call it with page size = 2
179- result = check_ref_permissions(
180- sock, "rpc-key", [b"master", b"develop", b"head"], 2)
181+
182+ # Call it with page size = 2 and maximum 10 parallel workers.
183+ pool = MockThreadPoolExecutor()
184+ executor = mock.Mock()
185+ executor.return_value = pool
186+ with mock.patch('turnip.pack.hooks.hook.ThreadPoolExecutor', executor):
187+ result = check_ref_permissions(
188+ "rpc-key", [b"master", b"develop", b"head"], 2, 10)
189+
190+ # Make sure it executed in parallel using ThreadPoolExecutor.
191+ self.assertEqual(1, executor.call_count)
192+ self.assertEqual(mock.call(max_workers=10), executor.call_args)
193+ self.assertEqual(2, len(pool.futures))
194
195 # The final result should have been joined into.
196 self.assertEqual(
197@@ -483,13 +495,9 @@ class TestSplitRefPathsCalls(TestCase):
198
199 # Check that it called correctly the rpc_invoke method.
200 self.assertEqual(rpc_invoke.call_args_list, [
201- mock.call(
202- sock, 'check_ref_permissions', {
203- 'key': 'rpc-key', 'paths': [
204- encode(b"master"), encode(b"develop")]
205- }),
206- mock.call(
207- sock, 'check_ref_permissions', {
208- 'key': 'rpc-key', 'paths': [encode(b"head")]
209- }),
210+ mock.call('check_ref_permissions', {
211+ 'key': 'rpc-key', 'paths': [
212+ encode(b"master"), encode(b"develop")]}),
213+ mock.call('check_ref_permissions', {
214+ 'key': 'rpc-key', 'paths': [encode(b"head")]}),
215 ])
216diff --git a/turnip/tests/test_helpers.py b/turnip/tests/test_helpers.py
217index 9dc4101..23fda1e 100644
218--- a/turnip/tests/test_helpers.py
219+++ b/turnip/tests/test_helpers.py
220@@ -39,3 +39,44 @@ class TestComposePath(TestCase):
221 ValueError, helpers.compose_path, b'/foo', b'../bar')
222 self.assertRaises(
223 ValueError, helpers.compose_path, b'/foo', b'/foo/../../bar')
224+
225+
226+class MockThreadPoolExecutor:
227+ """A mock class for concurrent.futures.*PoolExecutor that executes
228+ everything sequentially, keeping every generated Future in
229+ self.futures.
230+ """
231+ class Future:
232+ def __init__(self, value=None, exception=None):
233+ self.value = value
234+ self.exeception = exception
235+
236+ def result(self):
237+ if self.exeception:
238+ raise self.exeception
239+ return self.value
240+
241+ def __init__(self, **kwargs):
242+ self.futures = []
243+
244+ def __enter__(self):
245+ return self
246+
247+ def __exit__(self, exc_type, exc_value, exc_traceback):
248+ pass
249+
250+ def submit(self, fn, *args, **kwargs):
251+ # execute functions in series, keeping the generated futures in
252+ # self.future list.
253+ exception = None
254+ result = None
255+ try:
256+ result = fn(*args, **kwargs)
257+ except Exception as e:
258+ exception = e
259+ future = MockThreadPoolExecutor.Future(result, exception)
260+ self.futures.append(future)
261+ return future
262+
263+ def shutdown(self, wait=True):
264+ pass

Subscribers

People subscribed via source and target branches