Merge ~tsimonq2/autopkgtest-cloud:cache-amqp-queue into autopkgtest-cloud:master

Proposed by Simon Quigley
Status: Merged
Merged at revision: 2a51459f79e35260465474f28c6c32e46474af4b
Proposed branch: ~tsimonq2/autopkgtest-cloud:cache-amqp-queue
Merge into: autopkgtest-cloud:master
Diff against target: 199 lines (+96/-65)
2 files modified
webcontrol/browse.cgi (+11/-65)
webcontrol/cache-amqp (+85/-0)
Reviewer Review Type Date Requested Status
Iain Lane Approve
Review via email: mp+383129@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Iain Lane (laney) wrote :

Cheers. I made a lot of changes to this but it's basically what you did. See the commits on top for the details!

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/webcontrol/browse.cgi b/webcontrol/browse.cgi
2index 9652c0a..c180068 100755
3--- a/webcontrol/browse.cgi
4+++ b/webcontrol/browse.cgi
5@@ -11,7 +11,6 @@ import re
6 import distro_info
7 from collections import OrderedDict
8
9-import amqplib.client_0_8 as amqp
10 from wsgiref.handlers import CGIHandler
11 import flask
12
13@@ -106,36 +105,6 @@ def srchash(src):
14 return src[0]
15
16
17-def get_queue_requests(amqp_channel, queue_name, queue_size):
18- '''Return list of pending requests in AMQP queue'''
19-
20- delivery_tags = []
21- requests = []
22-
23- # This is going to be too slow
24- if queue_size > 500:
25- return ['Too many requests to display']
26-
27- # non-acking read of all requests to inspect the queue
28- while True:
29- r = amqp_channel.basic_get(queue_name)
30- if r is None:
31- break
32- requests.append(r.body)
33- delivery_tags.append(r.delivery_tag)
34- # now explicitly reject the requests so that they become available again
35- # for real workers
36- for t in delivery_tags:
37- amqp_channel.basic_reject(t, True)
38-
39- res = []
40- for r in requests:
41- if isinstance(r, bytes):
42- r = r.decode('UTF-8')
43- res.append(r)
44- return res
45-
46-
47 def get_release_arches():
48 '''Determine available releases and architectures
49
50@@ -156,39 +125,16 @@ def get_queue_info(include_requests=True):
51
52 Return (releases, arches, context -> release -> arch -> (queue_size, [requests])).
53 '''
54- if not amqp_uri:
55- return ([], [], {})
56-
57- # connect to AMQP
58- parts = urllib.parse.urlsplit(amqp_uri, allow_fragments=False)
59- amqp_con = amqp.Connection(parts.hostname, userid=parts.username,
60- password=parts.password)
61- ch = amqp_con.channel()
62- release_arches = get_release_arches()
63- all_arches = set()
64-
65- result = {}
66- for context in AMQP_CONTEXTS:
67- for release, arches in release_arches.items():
68- for arch in arches:
69- if context == 'ubuntu':
70- # ubuntu test requests use context-less name (backwards compatibility)
71- queue_name = 'debci-%s-%s' % (release, arch)
72- else:
73- queue_name = 'debci-%s-%s-%s' % (context, release, arch)
74- (_, queue_size, _) = ch.queue_declare(queue=queue_name,
75- durable=True,
76- exclusive=False,
77- auto_delete=False)
78- if include_requests:
79- requests = get_queue_requests(ch, queue_name, queue_size)
80- else:
81- requests = []
82- result.setdefault(context, {}).setdefault(release, {})[arch] = (queue_size,
83- requests)
84- all_arches.add(arch)
85-
86- return (release_arches.keys(), sorted(all_arches), result)
87+
88+ if include_requests:
89+ cache = '/tmp/queued.json'
90+ else:
91+ cache = '/tmp/queued_no_requests.json'
92+
93+ with open(cache, 'r') as queued:
94+ queue_info = tuple(json.load(queued))
95+
96+ return queue_info
97
98
99 def get_source_versions(db, release):
100@@ -328,7 +274,7 @@ def running():
101 for c in AMQP_CONTEXTS:
102 for r in releases:
103 for a in arches:
104- (queue_length, queue_items) = queue_info.get(c, {}).get(r, {}).get(a, (0, []))
105+ (queue_length, queue_items) = queue_info.get(c, {}).get(r, {}).get(a, (0, []))
106 queue_lengths.setdefault(c, {}).setdefault(r, {})[a] = queue_length
107
108 try:
109diff --git a/webcontrol/cache-amqp b/webcontrol/cache-amqp
110new file mode 100755
111index 0000000..af01bd9
112--- /dev/null
113+++ b/webcontrol/cache-amqp
114@@ -0,0 +1,85 @@
115+#!/usr/bin/python3
116+
117+import json
118+import urllib
119+import amqplib.client_0_8 as amqp
120+
121+amqp_uri = None
122+
123+
124+def get_queue_requests(amqp_channel, queue_name, queue_size):
125+ '''Return list of pending requests in AMQP queue'''
126+
127+ delivery_tags = []
128+ requests = []
129+
130+ # This is going to be too slow
131+ if queue_size > 1000:
132+ return ['1000+ requests, too many to display']
133+
134+ # non-acking read of all requests to inspect the queue
135+ while True:
136+ r = amqp_channel.basic_get(queue_name)
137+ if r is None:
138+ break
139+ requests.append(r.body)
140+ delivery_tags.append(r.delivery_tag)
141+ # now explicitly reject the requests so that they become available again
142+ # for real workers
143+ for t in delivery_tags:
144+ amqp_channel.basic_reject(t, True)
145+
146+ res = []
147+ for r in requests:
148+ if isinstance(r, bytes):
149+ r = r.decode('UTF-8')
150+ res.append(r)
151+ return res
152+
153+
154+def query_amqp(output, include_requests=True):
155+ '''Get queue contents from AMQP and cache them on disk'''
156+
157+ queued = ""
158+
159+ if not amqp_uri:
160+ queued = ([], [], {})
161+
162+ # connect to AMQP
163+ parts = urllib.parse.urlsplit(amqp_uri, allow_fragments=False)
164+ amqp_con = amqp.Connection(parts.hostname, userid=parts.username,
165+ password=parts.password)
166+ ch = amqp_con.channel()
167+ release_arches = get_release_arches()
168+ all_arches = set()
169+
170+ result = {}
171+ for context in AMQP_CONTEXTS:
172+ for release, arches in release_arches.items():
173+ for arch in arches:
174+ if context == 'ubuntu':
175+ # ubuntu test requests use context-less name (backwards compatibility)
176+ queue_name = 'debci-%s-%s' % (release, arch)
177+ else:
178+ queue_name = 'debci-%s-%s-%s' % (context, release, arch)
179+ (_, queue_size, _) = ch.queue_declare(queue=queue_name,
180+ durable=True,
181+ exclusive=False,
182+ auto_delete=False)
183+ if include_requests:
184+ requests = get_queue_requests(ch, queue_name, queue_size)
185+ else:
186+ requests = []
187+ result.setdefault(context, {}).setdefault(release, {})[arch] = (queue_size,
188+ requests)
189+ all_arches.add(arch)
190+
191+ queued = (release_arches.keys(), sorted(all_arches), result)
192+
193+ with open(output, 'w') as f:
194+ json.dump(queued, f)
195+
196+
197+if __name__ == '__main__':
198+ query_amqp('/tmp/queued.json')
199+ query_amqp('/tmp/queued_no_requests.json')

Subscribers

People subscribed via source and target branches