Merge lp:~sidnei/txstatsd/dup-flag into lp:txstatsd

Proposed by Sidnei da Silva
Status: Merged
Approved by: Sidnei da Silva
Approved revision: 77
Merged at revision: 76
Proposed branch: lp:~sidnei/txstatsd/dup-flag
Merge into: lp:txstatsd
Diff against target: 140 lines (+72/-12)
2 files modified
txstatsd/server/router.py (+25/-12)
txstatsd/tests/test_router.py (+47/-0)
To merge this branch: bzr merge lp:~sidnei/txstatsd/dup-flag
Reviewer Review Type Date Requested Status
Martin Albisetti (community) Approve
Review via email: mp+97509@code.launchpad.net

Commit message

Add dup flag to rewrite/set_metric_type targets, so it yields the original message in addition to the rewritten one.

Description of the change

Add dup flag to rewrite/set_metric_type targets, so it yields the original
message in addition to the rewritten one.

To post a comment you must log in.
Revision history for this message
Martin Albisetti (beuno) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'txstatsd/server/router.py'
2--- txstatsd/server/router.py 2012-02-07 12:28:35 +0000
3+++ txstatsd/server/router.py 2012-03-14 20:15:23 +0000
4@@ -230,21 +230,25 @@
5 def build_target_drop(self):
6 """Returns a target that stops the processing of a message."""
7 def drop(*args):
8- raise StopProcessingException()
9+ return
10 return drop
11
12- def build_target_rewrite(self, pattern, repl):
13+ def build_target_rewrite(self, pattern, repl, dup="no-dup"):
14 rexp = re.compile(pattern)
15
16 def rewrite_target(metric_type, key, fields):
17+ if dup == "dup" and rexp.match(key) is not None:
18+ yield metric_type, key, fields
19 key = rexp.sub(repl, key)
20- return (metric_type, key, fields)
21+ yield metric_type, key, fields
22
23 return rewrite_target
24
25- def build_target_set_metric_type(self, metric_type):
26+ def build_target_set_metric_type(self, metric_type, dup="no-dup"):
27 def set_metric_type(_, key, fields):
28- return metric_type, key, fields
29+ if dup == "dup":
30+ yield _, key, fields
31+ yield metric_type, key, fields
32 return set_metric_type
33
34 def build_target_redirect_udp(self, host, port):
35@@ -262,7 +266,7 @@
36 def redirect_udp_target(metric_type, key, fields):
37 message = self.rebuild_message(metric_type, key, fields)
38 protocol.write(message)
39- return metric_type, key, fields
40+ yield metric_type, key, fields
41 return redirect_udp_target
42
43 def build_target_redirect_tcp(self, host, port):
44@@ -280,17 +284,26 @@
45 def redirect_tcp_target(metric_type, key, fields):
46 message = self.rebuild_message(metric_type, key, fields)
47 factory.write(message)
48- return metric_type, key, fields
49+ yield metric_type, key, fields
50 return redirect_tcp_target
51
52 def process_message(self, message, metric_type, key, fields):
53- try:
54+ metrics = [(metric_type, key, fields)]
55+ if self.rules:
56+ pending, metrics = metrics, []
57 for condition, target in self.rules:
58- if condition(metric_type, key, fields):
59- metric_type, key, fields = target(metric_type, key, fields)
60+ if not pending:
61+ return
62+ for metric_type, key, fields in pending:
63+ if not condition(metric_type, key, fields):
64+ metrics.append((metric_type, key, fields))
65+ continue
66+ result = target(metric_type, key, fields)
67+ if result is not None:
68+ metrics.extend(result)
69+ pending = metrics
70
71+ for (metric_type, key, fields) in metrics:
72 message = self.rebuild_message(metric_type, key, fields)
73 self.message_processor.process_message(message, metric_type,
74 key, fields)
75- except StopProcessingException:
76- pass
77
78=== modified file 'txstatsd/tests/test_router.py'
79--- txstatsd/tests/test_router.py 2012-02-07 12:28:35 +0000
80+++ txstatsd/tests/test_router.py 2012-03-14 20:15:23 +0000
81@@ -85,6 +85,31 @@
82 self.assertEqual(self.processor.messages[0][2], "glork.gorets")
83 self.assertEqual(self.processor.messages[1][2], "nomatch")
84
85+ def test_rewrite_and_dup(self):
86+ """
87+ Process all messages but only rewrite matching ones. If dup flag is set
88+ then duplicate original message without rewriting it.
89+ """
90+ self.update_rules(r"any => rewrite (gorets) glork.\1 dup")
91+ self.router.process("gorets:1|c")
92+ self.router.process("nomatch:1|d")
93+ self.assertEqual(len(self.processor.messages), 3)
94+ self.assertEqual(self.processor.messages[0][2], "gorets")
95+ self.assertEqual(self.processor.messages[1][2], "glork.gorets")
96+ self.assertEqual(self.processor.messages[2][2], "nomatch")
97+
98+ def test_rewrite_and_no_dup(self):
99+ """
100+ Process all messages but only rewrite matching ones. If dup flag is set
101+ to no-dup, then the original message is not duplicated.
102+ """
103+ self.update_rules(r"any => rewrite (gorets) glork.\1 no-dup")
104+ self.router.process("gorets:1|c")
105+ self.router.process("nomatch:1|d")
106+ self.assertEqual(len(self.processor.messages), 2)
107+ self.assertEqual(self.processor.messages[0][2], "glork.gorets")
108+ self.assertEqual(self.processor.messages[1][2], "nomatch")
109+
110 def test_set_metric_type(self):
111 """
112 Set metric type to something else.
113@@ -94,6 +119,28 @@
114 self.assertEqual(self.processor.messages[0][1], "d")
115 self.assertEqual(self.processor.messages[0][2], "gorets")
116
117+ def test_set_metric_type_dup(self):
118+ """
119+ Set metric type to something else. If the dup flag is set, duplicate
120+ the original message.
121+ """
122+ self.update_rules(r"any => set_metric_type d dup")
123+ self.router.process("gorets:1|c")
124+ self.assertEqual(self.processor.messages[0][1], "c")
125+ self.assertEqual(self.processor.messages[0][2], "gorets")
126+ self.assertEqual(self.processor.messages[1][1], "d")
127+ self.assertEqual(self.processor.messages[1][2], "gorets")
128+
129+ def test_set_metric_type_no_dup(self):
130+ """
131+ Set metric type to something else. If the dup flag is set to no-dup
132+ then do not duplicate the original message.
133+ """
134+ self.update_rules(r"any => set_metric_type d no-dup")
135+ self.router.process("gorets:1|c")
136+ self.assertEqual(self.processor.messages[0][1], "d")
137+ self.assertEqual(self.processor.messages[0][2], "gorets")
138+
139
140 class TestUDPRedirect(TxTestCase):
141

Subscribers

People subscribed via source and target branches