Merge ~tsimonq2/autopkgtest-cloud:cache-amqp-queue into autopkgtest-cloud:master

Proposed by Simon Quigley
Status: Merged
Merged at revision: 2a51459f79e35260465474f28c6c32e46474af4b
Proposed branch: ~tsimonq2/autopkgtest-cloud:cache-amqp-queue
Merge into: autopkgtest-cloud:master
Diff against target: 199 lines (+96/-65)
2 files modified
webcontrol/browse.cgi (+11/-65)
webcontrol/cache-amqp (+85/-0)
Reviewer Review Type Date Requested Status
Iain Lane Approve
Review via email: mp+383129@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Iain Lane (laney) wrote :

Cheers. I made a lot of changes to this but it's basically what you did. See the commits on top for the details!

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
diff --git a/webcontrol/browse.cgi b/webcontrol/browse.cgi
index 9652c0a..c180068 100755
--- a/webcontrol/browse.cgi
+++ b/webcontrol/browse.cgi
@@ -11,7 +11,6 @@ import re
11import distro_info11import distro_info
12from collections import OrderedDict12from collections import OrderedDict
1313
14import amqplib.client_0_8 as amqp
15from wsgiref.handlers import CGIHandler14from wsgiref.handlers import CGIHandler
16import flask15import flask
1716
@@ -106,36 +105,6 @@ def srchash(src):
106 return src[0]105 return src[0]
107106
108107
109def get_queue_requests(amqp_channel, queue_name, queue_size):
110 '''Return list of pending requests in AMQP queue'''
111
112 delivery_tags = []
113 requests = []
114
115 # This is going to be too slow
116 if queue_size > 500:
117 return ['Too many requests to display']
118
119 # non-acking read of all requests to inspect the queue
120 while True:
121 r = amqp_channel.basic_get(queue_name)
122 if r is None:
123 break
124 requests.append(r.body)
125 delivery_tags.append(r.delivery_tag)
126 # now explicitly reject the requests so that they become available again
127 # for real workers
128 for t in delivery_tags:
129 amqp_channel.basic_reject(t, True)
130
131 res = []
132 for r in requests:
133 if isinstance(r, bytes):
134 r = r.decode('UTF-8')
135 res.append(r)
136 return res
137
138
139def get_release_arches():108def get_release_arches():
140 '''Determine available releases and architectures109 '''Determine available releases and architectures
141110
@@ -156,39 +125,16 @@ def get_queue_info(include_requests=True):
156125
157 Return (releases, arches, context -> release -> arch -> (queue_size, [requests])).126 Return (releases, arches, context -> release -> arch -> (queue_size, [requests])).
158 '''127 '''
159 if not amqp_uri:128
160 return ([], [], {})129 if include_requests:
161130 cache = '/tmp/queued.json'
162 # connect to AMQP131 else:
163 parts = urllib.parse.urlsplit(amqp_uri, allow_fragments=False)132 cache = '/tmp/queued_no_requests.json'
164 amqp_con = amqp.Connection(parts.hostname, userid=parts.username,133
165 password=parts.password)134 with open(cache, 'r') as queued:
166 ch = amqp_con.channel()135 queue_info = tuple(json.load(queued))
167 release_arches = get_release_arches()136
168 all_arches = set()137 return queue_info
169
170 result = {}
171 for context in AMQP_CONTEXTS:
172 for release, arches in release_arches.items():
173 for arch in arches:
174 if context == 'ubuntu':
175 # ubuntu test requests use context-less name (backwards compatibility)
176 queue_name = 'debci-%s-%s' % (release, arch)
177 else:
178 queue_name = 'debci-%s-%s-%s' % (context, release, arch)
179 (_, queue_size, _) = ch.queue_declare(queue=queue_name,
180 durable=True,
181 exclusive=False,
182 auto_delete=False)
183 if include_requests:
184 requests = get_queue_requests(ch, queue_name, queue_size)
185 else:
186 requests = []
187 result.setdefault(context, {}).setdefault(release, {})[arch] = (queue_size,
188 requests)
189 all_arches.add(arch)
190
191 return (release_arches.keys(), sorted(all_arches), result)
192138
193139
194def get_source_versions(db, release):140def get_source_versions(db, release):
@@ -328,7 +274,7 @@ def running():
328 for c in AMQP_CONTEXTS:274 for c in AMQP_CONTEXTS:
329 for r in releases:275 for r in releases:
330 for a in arches:276 for a in arches:
331 (queue_length, queue_items) = queue_info.get(c, {}).get(r, {}).get(a, (0, []))277 (queue_length, queue_items) = queue_info.get(c, {}).get(r, {}).get(a, (0, []))
332 queue_lengths.setdefault(c, {}).setdefault(r, {})[a] = queue_length278 queue_lengths.setdefault(c, {}).setdefault(r, {})[a] = queue_length
333279
334 try:280 try:
diff --git a/webcontrol/cache-amqp b/webcontrol/cache-amqp
335new file mode 100755281new file mode 100755
index 0000000..af01bd9
--- /dev/null
+++ b/webcontrol/cache-amqp
@@ -0,0 +1,85 @@
1#!/usr/bin/python3
2
3import json
4import urllib
5import amqplib.client_0_8 as amqp
6
7amqp_uri = None
8
9
10def get_queue_requests(amqp_channel, queue_name, queue_size):
11 '''Return list of pending requests in AMQP queue'''
12
13 delivery_tags = []
14 requests = []
15
16 # This is going to be too slow
17 if queue_size > 1000:
18 return ['1000+ requests, too many to display']
19
20 # non-acking read of all requests to inspect the queue
21 while True:
22 r = amqp_channel.basic_get(queue_name)
23 if r is None:
24 break
25 requests.append(r.body)
26 delivery_tags.append(r.delivery_tag)
27 # now explicitly reject the requests so that they become available again
28 # for real workers
29 for t in delivery_tags:
30 amqp_channel.basic_reject(t, True)
31
32 res = []
33 for r in requests:
34 if isinstance(r, bytes):
35 r = r.decode('UTF-8')
36 res.append(r)
37 return res
38
39
40def query_amqp(output, include_requests=True):
41 '''Get queue contents from AMQP and cache them on disk'''
42
43 queued = ""
44
45 if not amqp_uri:
46 queued = ([], [], {})
47
48 # connect to AMQP
49 parts = urllib.parse.urlsplit(amqp_uri, allow_fragments=False)
50 amqp_con = amqp.Connection(parts.hostname, userid=parts.username,
51 password=parts.password)
52 ch = amqp_con.channel()
53 release_arches = get_release_arches()
54 all_arches = set()
55
56 result = {}
57 for context in AMQP_CONTEXTS:
58 for release, arches in release_arches.items():
59 for arch in arches:
60 if context == 'ubuntu':
61 # ubuntu test requests use context-less name (backwards compatibility)
62 queue_name = 'debci-%s-%s' % (release, arch)
63 else:
64 queue_name = 'debci-%s-%s-%s' % (context, release, arch)
65 (_, queue_size, _) = ch.queue_declare(queue=queue_name,
66 durable=True,
67 exclusive=False,
68 auto_delete=False)
69 if include_requests:
70 requests = get_queue_requests(ch, queue_name, queue_size)
71 else:
72 requests = []
73 result.setdefault(context, {}).setdefault(release, {})[arch] = (queue_size,
74 requests)
75 all_arches.add(arch)
76
77 queued = (release_arches.keys(), sorted(all_arches), result)
78
79 with open(output, 'w') as f:
80 json.dump(queued, f)
81
82
83if __name__ == '__main__':
84 query_amqp('/tmp/queued.json')
85 query_amqp('/tmp/queued_no_requests.json')

Subscribers

People subscribed via source and target branches