Merge lp:~sidnei/txstatsd/dup-flag into lp:txstatsd

Proposed by Sidnei da Silva
Status: Merged
Approved by: Sidnei da Silva
Approved revision: 77
Merged at revision: 76
Proposed branch: lp:~sidnei/txstatsd/dup-flag
Merge into: lp:txstatsd
Diff against target: 140 lines (+72/-12)
2 files modified
txstatsd/server/router.py (+25/-12)
txstatsd/tests/test_router.py (+47/-0)
To merge this branch: bzr merge lp:~sidnei/txstatsd/dup-flag
Reviewer Review Type Date Requested Status
Martin Albisetti (community) Approve
Review via email: mp+97509@code.launchpad.net

Commit message

Add dup flag to rewrite/set_metric_type targets, so it yields the original message in addition to the rewritten one.

Description of the change

Add dup flag to rewrite/set_metric_type targets, so it yields the original
message in addition to the rewritten one.

To post a comment you must log in.
Revision history for this message
Martin Albisetti (beuno) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'txstatsd/server/router.py'
--- txstatsd/server/router.py 2012-02-07 12:28:35 +0000
+++ txstatsd/server/router.py 2012-03-14 20:15:23 +0000
@@ -230,21 +230,25 @@
230 def build_target_drop(self):230 def build_target_drop(self):
231 """Returns a target that stops the processing of a message."""231 """Returns a target that stops the processing of a message."""
232 def drop(*args):232 def drop(*args):
233 raise StopProcessingException()233 return
234 return drop234 return drop
235235
236 def build_target_rewrite(self, pattern, repl):236 def build_target_rewrite(self, pattern, repl, dup="no-dup"):
237 rexp = re.compile(pattern)237 rexp = re.compile(pattern)
238238
239 def rewrite_target(metric_type, key, fields):239 def rewrite_target(metric_type, key, fields):
240 if dup == "dup" and rexp.match(key) is not None:
241 yield metric_type, key, fields
240 key = rexp.sub(repl, key)242 key = rexp.sub(repl, key)
241 return (metric_type, key, fields)243 yield metric_type, key, fields
242244
243 return rewrite_target245 return rewrite_target
244246
245 def build_target_set_metric_type(self, metric_type):247 def build_target_set_metric_type(self, metric_type, dup="no-dup"):
246 def set_metric_type(_, key, fields):248 def set_metric_type(_, key, fields):
247 return metric_type, key, fields249 if dup == "dup":
250 yield _, key, fields
251 yield metric_type, key, fields
248 return set_metric_type252 return set_metric_type
249253
250 def build_target_redirect_udp(self, host, port):254 def build_target_redirect_udp(self, host, port):
@@ -262,7 +266,7 @@
262 def redirect_udp_target(metric_type, key, fields):266 def redirect_udp_target(metric_type, key, fields):
263 message = self.rebuild_message(metric_type, key, fields)267 message = self.rebuild_message(metric_type, key, fields)
264 protocol.write(message)268 protocol.write(message)
265 return metric_type, key, fields269 yield metric_type, key, fields
266 return redirect_udp_target270 return redirect_udp_target
267271
268 def build_target_redirect_tcp(self, host, port):272 def build_target_redirect_tcp(self, host, port):
@@ -280,17 +284,26 @@
280 def redirect_tcp_target(metric_type, key, fields):284 def redirect_tcp_target(metric_type, key, fields):
281 message = self.rebuild_message(metric_type, key, fields)285 message = self.rebuild_message(metric_type, key, fields)
282 factory.write(message)286 factory.write(message)
283 return metric_type, key, fields287 yield metric_type, key, fields
284 return redirect_tcp_target288 return redirect_tcp_target
285289
286 def process_message(self, message, metric_type, key, fields):290 def process_message(self, message, metric_type, key, fields):
287 try:291 metrics = [(metric_type, key, fields)]
292 if self.rules:
293 pending, metrics = metrics, []
288 for condition, target in self.rules:294 for condition, target in self.rules:
289 if condition(metric_type, key, fields):295 if not pending:
290 metric_type, key, fields = target(metric_type, key, fields)296 return
297 for metric_type, key, fields in pending:
298 if not condition(metric_type, key, fields):
299 metrics.append((metric_type, key, fields))
300 continue
301 result = target(metric_type, key, fields)
302 if result is not None:
303 metrics.extend(result)
304 pending = metrics
291305
306 for (metric_type, key, fields) in metrics:
292 message = self.rebuild_message(metric_type, key, fields)307 message = self.rebuild_message(metric_type, key, fields)
293 self.message_processor.process_message(message, metric_type,308 self.message_processor.process_message(message, metric_type,
294 key, fields)309 key, fields)
295 except StopProcessingException:
296 pass
297310
=== modified file 'txstatsd/tests/test_router.py'
--- txstatsd/tests/test_router.py 2012-02-07 12:28:35 +0000
+++ txstatsd/tests/test_router.py 2012-03-14 20:15:23 +0000
@@ -85,6 +85,31 @@
85 self.assertEqual(self.processor.messages[0][2], "glork.gorets")85 self.assertEqual(self.processor.messages[0][2], "glork.gorets")
86 self.assertEqual(self.processor.messages[1][2], "nomatch")86 self.assertEqual(self.processor.messages[1][2], "nomatch")
8787
88 def test_rewrite_and_dup(self):
89 """
90 Process all messages but only rewrite matching ones. If dup flag is set
91 then duplicate original message without rewriting it.
92 """
93 self.update_rules(r"any => rewrite (gorets) glork.\1 dup")
94 self.router.process("gorets:1|c")
95 self.router.process("nomatch:1|d")
96 self.assertEqual(len(self.processor.messages), 3)
97 self.assertEqual(self.processor.messages[0][2], "gorets")
98 self.assertEqual(self.processor.messages[1][2], "glork.gorets")
99 self.assertEqual(self.processor.messages[2][2], "nomatch")
100
101 def test_rewrite_and_no_dup(self):
102 """
103 Process all messages but only rewrite matching ones. If dup flag is set
104 to no-dup, then the original message is not duplicated.
105 """
106 self.update_rules(r"any => rewrite (gorets) glork.\1 no-dup")
107 self.router.process("gorets:1|c")
108 self.router.process("nomatch:1|d")
109 self.assertEqual(len(self.processor.messages), 2)
110 self.assertEqual(self.processor.messages[0][2], "glork.gorets")
111 self.assertEqual(self.processor.messages[1][2], "nomatch")
112
88 def test_set_metric_type(self):113 def test_set_metric_type(self):
89 """114 """
90 Set metric type to something else.115 Set metric type to something else.
@@ -94,6 +119,28 @@
94 self.assertEqual(self.processor.messages[0][1], "d")119 self.assertEqual(self.processor.messages[0][1], "d")
95 self.assertEqual(self.processor.messages[0][2], "gorets")120 self.assertEqual(self.processor.messages[0][2], "gorets")
96121
122 def test_set_metric_type_dup(self):
123 """
124 Set metric type to something else. If the dup flag is set, duplicate
125 the original message.
126 """
127 self.update_rules(r"any => set_metric_type d dup")
128 self.router.process("gorets:1|c")
129 self.assertEqual(self.processor.messages[0][1], "c")
130 self.assertEqual(self.processor.messages[0][2], "gorets")
131 self.assertEqual(self.processor.messages[1][1], "d")
132 self.assertEqual(self.processor.messages[1][2], "gorets")
133
134 def test_set_metric_type_no_dup(self):
135 """
136 Set metric type to something else. If the dup flag is set to no-dup
137 then do not duplicate the original message.
138 """
139 self.update_rules(r"any => set_metric_type d no-dup")
140 self.router.process("gorets:1|c")
141 self.assertEqual(self.processor.messages[0][1], "d")
142 self.assertEqual(self.processor.messages[0][2], "gorets")
143
97144
98class TestUDPRedirect(TxTestCase):145class TestUDPRedirect(TxTestCase):
99146

Subscribers

People subscribed via source and target branches