Merge lp:~verterok/lalita/zmq-proxy into lp:lalita
- zmq-proxy
- Merge into trunk
Proposed by
Guillermo Gonzalez
Status: | Merged |
---|---|
Approved by: | Facundo Batista |
Approved revision: | 179 |
Merged at revision: | 173 |
Proposed branch: | lp:~verterok/lalita/zmq-proxy |
Merge into: | lp:lalita |
Diff against target: |
665 lines (+497/-9) 11 files modified
docs/tutorial_en.rst (+5/-0) docs/tutorial_sp.rst (+4/-0) lalita.cfg.sample (+4/-0) lalita/core/tests/test_dispatcher.py (+3/-2) lalita/core/tests/test_events.py (+2/-1) lalita/core/tests/test_ircbot.py (+3/-3) lalita/core/tests/test_metacommands.py (+2/-1) lalita/plugins/tests/helper.py (+8/-2) lalita/plugins/tests/test_zmq_proxy.py (+204/-0) lalita/plugins/zmq_plugins/example.py (+52/-0) lalita/plugins/zmq_proxy.py (+210/-0) |
To merge this branch: | bzr merge lp:~verterok/lalita/zmq-proxy |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Facundo Batista | Approve | ||
Review via email: mp+113858@code.launchpad.net |
Commit message
ZeroMQ proxy plugin, converts lalita into a irc <-> ZeroMQ bridge (to create plugins in external processes)
Description of the change
Add a plugin that converts lalita into a irc <-> ZeroMQ bridge, this allow to create plugins in external processes.
To post a comment you must log in.
lp:~verterok/lalita/zmq-proxy
updated
- 178. By Guillermo Gonzalez
-
add info about zmq-proxy plugin in the docs.
Revision history for this message
Guillermo Gonzalez (verterok) wrote : | # |
lp:~verterok/lalita/zmq-proxy
updated
- 179. By Guillermo Gonzalez
-
fix typo
Revision history for this message
Facundo Batista (facundo) wrote : | # |
Awesome, thanks!
review:
Approve
lp:~verterok/lalita/zmq-proxy
updated
- 180. By Guillermo Gonzalez
-
add copyright notice
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'docs/tutorial_en.rst' |
2 | --- docs/tutorial_en.rst 2011-04-11 02:06:18 +0000 |
3 | +++ docs/tutorial_en.rst 2012-07-09 13:48:35 +0000 |
4 | @@ -633,6 +633,11 @@ |
5 | - url.py: Recollects all the URLs that are mentioned on the different channels, |
6 | and you can search them afterwards. |
7 | |
8 | +- zmq_proxy.py: It's a IRC-ZeroMQ <http://www.zeromq.org/> proxy/bridge, that |
9 | + publish all lalita events to a PUB/SUB ZeroMQ socket and listen for commands |
10 | + in other socket (in json format). See zmq_plugins/example.py for an example |
11 | + plugin using this. |
12 | + |
13 | |
14 | Advanced configuration |
15 | ====================== |
16 | |
17 | === modified file 'docs/tutorial_sp.rst' |
18 | --- docs/tutorial_sp.rst 2011-04-11 02:06:18 +0000 |
19 | +++ docs/tutorial_sp.rst 2012-07-09 13:48:35 +0000 |
20 | @@ -670,6 +670,10 @@ |
21 | - url.py: Va juntando todas las URLs que se van mencionando en los |
22 | distintos canales, y luego nos permite buscar en las mismas. |
23 | |
24 | +- zmq_proxy.py: Es un proxy/bridge entre IRC y ZeroMQ <http://www.zeromq.org/> |
25 | + que publica todos los eventos de lalita en un socket pub/sub de ZeroMQ y |
26 | + escucha 'comandos' en otro socket (en formato json). |
27 | + Ver zmq_plugins/example.py para un ejemplo de plugin en otro proceso. |
28 | |
29 | ConfiguraciĆ³n avanzada |
30 | ====================== |
31 | |
32 | === modified file 'lalita.cfg.sample' |
33 | --- lalita.cfg.sample 2011-04-11 02:05:31 +0000 |
34 | +++ lalita.cfg.sample 2012-07-09 13:48:35 +0000 |
35 | @@ -55,6 +55,10 @@ |
36 | # 'url.Url': { }, |
37 | 'lalita.plugins.photo.Photo': { }, |
38 | 'lalita.plugins.misc.Ping': { }, |
39 | + 'zmq_proxy.ZMQPlugin':{ |
40 | + 'events_address':'tcp://127.0.0.1:9090', |
41 | + 'bot_address':'tcp://127.0.0.1:9091', |
42 | + }, |
43 | }, |
44 | ), |
45 | 'example': dict ( |
46 | |
47 | === modified file 'lalita/core/tests/test_dispatcher.py' |
48 | --- lalita/core/tests/test_dispatcher.py 2010-06-22 00:12:42 +0000 |
49 | +++ lalita/core/tests/test_dispatcher.py 2012-07-09 13:48:35 +0000 |
50 | @@ -2,6 +2,7 @@ |
51 | # License: GPL v3 |
52 | # For further info, see LICENSE file |
53 | |
54 | +import logging |
55 | import re |
56 | |
57 | from collections import defaultdict |
58 | @@ -26,7 +27,7 @@ |
59 | ) |
60 | |
61 | ircbot_factory = ircbot.IRCBotFactory(server) |
62 | - ircbot.logger.setLevel("error") |
63 | + ircbot.logger.setLevel(logging.ERROR) |
64 | self.bot = ircbot.IrcBot() |
65 | self.bot.factory = ircbot_factory |
66 | self.bot.config = ircbot_factory.config |
67 | @@ -213,7 +214,7 @@ |
68 | if name == name.upper()] |
69 | for event in supported_events: |
70 | self.disp.register(event, self.helper.f) |
71 | - if (event in dispatcher.USER_POS and |
72 | + if (event in dispatcher.USER_POS and |
73 | dispatcher.USER_POS[event] is not None): |
74 | self.disp.push(event, 'user', 'channel', 'msg') |
75 | elif (event in dispatcher.CHANNEL_POS and |
76 | |
77 | === modified file 'lalita/core/tests/test_events.py' |
78 | --- lalita/core/tests/test_events.py 2010-06-20 16:34:35 +0000 |
79 | +++ lalita/core/tests/test_events.py 2012-07-09 13:48:35 +0000 |
80 | @@ -2,6 +2,7 @@ |
81 | # License: GPL v3 |
82 | # For further info, see LICENSE file |
83 | |
84 | +import logging |
85 | |
86 | from twisted.trial.unittest import TestCase as TwistedTestCase |
87 | from twisted.internet import defer |
88 | @@ -21,7 +22,7 @@ |
89 | def write(*a): pass |
90 | |
91 | ircbot_factory = ircbot.IRCBotFactory(server) |
92 | -ircbot.logger.setLevel("error") |
93 | +ircbot.logger.setLevel(logging.ERROR) |
94 | bot = ircbot.IrcBot() |
95 | bot.factory = ircbot_factory |
96 | bot.msg = lambda *a:None |
97 | |
98 | === modified file 'lalita/core/tests/test_ircbot.py' |
99 | --- lalita/core/tests/test_ircbot.py 2010-06-16 11:45:13 +0000 |
100 | +++ lalita/core/tests/test_ircbot.py 2012-07-09 13:48:35 +0000 |
101 | @@ -47,7 +47,7 @@ |
102 | def setUp(self): |
103 | server["log_config"] = {} |
104 | ircbot_factory = ircbot.IRCBotFactory(server) |
105 | - ircbot.logger.setLevel("error") |
106 | + ircbot.logger.setLevel(logging.ERROR) |
107 | self.bot = bot = ircbot.IrcBot() |
108 | bot.factory = ircbot_factory |
109 | bot.config = ircbot_factory.config |
110 | @@ -129,7 +129,7 @@ |
111 | |
112 | def setUp(self): |
113 | ircbot_factory = ircbot.IRCBotFactory(server) |
114 | - ircbot.logger.setLevel("error") |
115 | + ircbot.logger.setLevel(logging.ERROR) |
116 | self.bot = bot = ircbot.IrcBot() |
117 | bot.factory = ircbot_factory |
118 | bot.config = ircbot_factory.config |
119 | @@ -168,7 +168,7 @@ |
120 | |
121 | def setUp(self): |
122 | ircbot_factory = ircbot.IRCBotFactory(server.copy()) |
123 | - ircbot.logger.setLevel("error") |
124 | + ircbot.logger.setLevel(logging.ERROR) |
125 | self.bot = bot = ircbot.IrcBot() |
126 | bot.factory = ircbot_factory |
127 | bot.config = ircbot_factory.config |
128 | |
129 | === modified file 'lalita/core/tests/test_metacommands.py' |
130 | --- lalita/core/tests/test_metacommands.py 2010-01-17 19:03:28 +0000 |
131 | +++ lalita/core/tests/test_metacommands.py 2012-07-09 13:48:35 +0000 |
132 | @@ -4,6 +4,7 @@ |
133 | # License: GPL v3 |
134 | # For further info, see LICENSE file |
135 | |
136 | +import logging |
137 | import unittest |
138 | |
139 | from collections import defaultdict |
140 | @@ -11,7 +12,7 @@ |
141 | from lalita import dispatcher, events, ircbot |
142 | |
143 | ircbot_factory = ircbot.IRCBotFactory(dict(log_config="error", channels=defaultdict(lambda: {}))) |
144 | -ircbot.logger.setLevel("error") |
145 | +ircbot.logger.setLevel(logging.ERROR) |
146 | bot = ircbot.IrcBot() |
147 | bot.factory = ircbot_factory |
148 | bot.config = ircbot_factory.config |
149 | |
150 | === modified file 'lalita/plugins/tests/helper.py' |
151 | --- lalita/plugins/tests/helper.py 2010-03-08 13:52:37 +0000 |
152 | +++ lalita/plugins/tests/helper.py 2012-07-09 13:48:35 +0000 |
153 | @@ -2,6 +2,7 @@ |
154 | # License: GPL v3 |
155 | # For further info, see LICENSE file |
156 | |
157 | +import logging |
158 | import unittest |
159 | |
160 | from lalita import ircbot |
161 | @@ -27,8 +28,8 @@ |
162 | (plugin_name, config) = server_plugin |
163 | self.test_server["plugins"] = { plugin_name: config } |
164 | |
165 | - self.test_server["log_config"] = { plugin_name: "error" } |
166 | - ircbot.logger.setLevel("error") |
167 | + self.test_server["log_config"] = { plugin_name: "ERROR" } |
168 | + ircbot.logger.setLevel(logging.ERROR) |
169 | ircbot_factory = ircbot.IRCBotFactory(self.test_server) |
170 | self.bot = ircbot.IrcBot() |
171 | self.bot.factory = ircbot_factory |
172 | @@ -61,6 +62,11 @@ |
173 | else: |
174 | raise ValueError("The searched plugin does not exist!") |
175 | |
176 | + def tearDown(self): |
177 | + if hasattr(self, 'bot') and hasattr(self.bot, 'dispatcher'): |
178 | + self.bot.dispatcher.shutdown() |
179 | + |
180 | + |
181 | def assertMessageInAnswer(self, message_idx, expected): |
182 | """assert the content of message with index: message_idx in self.answer |
183 | and handle unexpected errors properly.""" |
184 | |
185 | === added file 'lalita/plugins/tests/test_zmq_proxy.py' |
186 | --- lalita/plugins/tests/test_zmq_proxy.py 1970-01-01 00:00:00 +0000 |
187 | +++ lalita/plugins/tests/test_zmq_proxy.py 2012-07-09 13:48:35 +0000 |
188 | @@ -0,0 +1,204 @@ |
189 | +# -*- coding: utf8 -*- |
190 | + |
191 | +# Copyright 2010 laliputienses |
192 | +# License: GPL v3 |
193 | +# For further info, see LICENSE file |
194 | +import json |
195 | +import time |
196 | + |
197 | +from twisted.trial.unittest import TestCase as TwistedTestCase |
198 | +from twisted.internet import defer, reactor, error |
199 | + |
200 | +from lalita import events |
201 | +from lalita.core.dispatcher import USER_POS, CHANNEL_POS |
202 | + |
203 | +from .helper import PluginTest |
204 | +from lalita.plugins.zmq_proxy import EVENT_MAP, PluginProcess |
205 | + |
206 | +try: |
207 | + import zmq, txzmq |
208 | + zmq_available = True |
209 | +except ImportError: |
210 | + zmq_available = False |
211 | + |
212 | + |
213 | +class TestZMQPlugin(TwistedTestCase, PluginTest): |
214 | + |
215 | + if not zmq_available: |
216 | + skip = "pyzmq and txzmq required." |
217 | + |
218 | + def setUp(self): |
219 | + super(TestZMQPlugin, self).setUp() |
220 | + self.init(server_plugin=("lalita.plugins.zmq_proxy.ZMQPlugin", |
221 | + {"events_address":"inproc://pub_addr", |
222 | + "bot_address":"inproc://sub_addr"})) |
223 | + self.ctx = zmq.Context.instance() |
224 | + self.sub = self.ctx.socket(zmq.SUB) |
225 | + while True: |
226 | + try: |
227 | + self.sub.connect("inproc://pub_addr") |
228 | + except zmq.ZMQError: |
229 | + continue |
230 | + else: |
231 | + break |
232 | + self.sub.setsockopt(zmq.SUBSCRIBE, "") |
233 | + self.cmd = self.ctx.socket(zmq.PUB) |
234 | + while True: |
235 | + try: |
236 | + self.cmd.connect("inproc://sub_addr") |
237 | + except zmq.ZMQError: |
238 | + time.sleep(0.2) |
239 | + continue |
240 | + else: |
241 | + break |
242 | + |
243 | + def tearDown(self): |
244 | + self.sub.close() |
245 | + self.cmd.close() |
246 | + self.plugin.shutdown() |
247 | + return super(TestZMQPlugin, self).tearDown() |
248 | + |
249 | + @defer.inlineCallbacks |
250 | + def test_say_action(self): |
251 | + """Say something via zmq.""" |
252 | + called = [] |
253 | + self.patch(self.plugin, 'say', lambda *a: called.append(a)) |
254 | + msg = {'action':'say', 'to_whom':"channel", "msg":"hola", "args":[]} |
255 | + self.cmd.send(json.dumps(msg)) |
256 | + d = defer.Deferred() |
257 | + reactor.callLater(0.2, lambda: d.callback(None)) |
258 | + yield d |
259 | + self.assertEqual(called[0], (msg['to_whom'].encode('utf-8'), msg['msg'].decode("utf-8"))) |
260 | + |
261 | + def test_event_handler(self): |
262 | + """Handle all events.""" |
263 | + called = [] |
264 | + self.patch(self.plugin, 'publish', lambda *a: called.append(a)) |
265 | + tested_events = { |
266 | + events.CONNECTION_MADE:(), |
267 | + events.CONNECTION_LOST:(), |
268 | + events.SIGNED_ON:(), |
269 | + events.JOINED:("channel",), |
270 | + events.PRIVATE_MESSAGE:("channel",), |
271 | + events.TALKED_TO_ME:("channel", "user"), |
272 | + events.PUBLIC_MESSAGE:("channel", "user"), |
273 | + events.ACTION:("channel", "user"), |
274 | + events.JOIN:("channel", "user"), |
275 | + events.LEFT:("channel", "user"), |
276 | + events.QUIT:("user",), |
277 | + events.KICK:("channel", "user"), |
278 | + events.RENAME:("user",) |
279 | + } |
280 | + for event, fixed_args in tested_events.items(): |
281 | + msg = "a message" |
282 | + args = fixed_args + (msg,) |
283 | + self.disp.push(event, *args) |
284 | + kwargs = {} |
285 | + if CHANNEL_POS[event] is not None: |
286 | + kwargs['channel'] = args[CHANNEL_POS[event]] |
287 | + if USER_POS[event] is not None: |
288 | + kwargs['user'] = args[USER_POS[event]] |
289 | + kwargs['args'] = [msg] |
290 | + self.assertIn((EVENT_MAP[event][0], kwargs), called) |
291 | + |
292 | + |
293 | +class TestPluginProcess(TwistedTestCase, PluginTest): |
294 | + """Tests for PluginProcess.""" |
295 | + |
296 | + class TestPlugin(PluginProcess): |
297 | + def __init__(self, addr1, addr2, called, config=None): |
298 | + self.called = called |
299 | + super(TestPluginProcess.TestPlugin, self).__init__(addr1, addr2, |
300 | + config=config) |
301 | + |
302 | + def init(self, config): |
303 | + self.config = config |
304 | + self.called.append(("init", config)) |
305 | + |
306 | + def _connect(self, events_address, bot_address): |
307 | + self.ctx = zmq.Context.instance() |
308 | + self.sub_socket = self.ctx.socket(zmq.SUB) |
309 | + while True: |
310 | + try: |
311 | + self.sub_socket.connect(events_address) |
312 | + except zmq.ZMQError: |
313 | + time.sleep(0.1) |
314 | + continue |
315 | + else: |
316 | + break |
317 | + self.sub_socket.setsockopt(zmq.SUBSCRIBE, "") |
318 | + self.bot_socket = self.ctx.socket(zmq.PUB) |
319 | + while True: |
320 | + try: |
321 | + self.bot_socket.connect(bot_address) |
322 | + except zmq.ZMQError: |
323 | + time.sleep(0.1) |
324 | + continue |
325 | + else: |
326 | + break |
327 | + |
328 | + def setUp(self): |
329 | + super(TestPluginProcess, self).setUp() |
330 | + events_address = "inproc://pub_addr" |
331 | + bot_address = "inproc://sub_addr" |
332 | + self.init(server_plugin=("lalita.plugins.zmq_proxy.ZMQPlugin", |
333 | + {"events_address":events_address, |
334 | + "bot_address":bot_address})) |
335 | + self.called = [] |
336 | + try: |
337 | + self.zmq_plugin = TestPluginProcess.TestPlugin(events_address, |
338 | + bot_address, |
339 | + self.called) |
340 | + except Exception, e: |
341 | + import traceback; |
342 | + traceback.print_exc() |
343 | + |
344 | + def tearDown(self): |
345 | + self.zmq_plugin.bot_socket.close() |
346 | + self.zmq_plugin.sub_socket.close() |
347 | + self.plugin.shutdown() |
348 | + return super(TestPluginProcess, self).tearDown() |
349 | + |
350 | + def test_init(self): |
351 | + """Test init is called.""" |
352 | + self.assertEquals(self.called, [("init", None)]) |
353 | + |
354 | + def test_register(self): |
355 | + """Register to en event.""" |
356 | + matcher = lambda a: True |
357 | + func = lambda *a: True |
358 | + self.zmq_plugin.register(events.JOINED, func, matcher) |
359 | + self.assertIn(events.JOINED, self.zmq_plugin._events) |
360 | + self.assertEquals(self.zmq_plugin._events[events.JOINED], (func, matcher)) |
361 | + |
362 | + def test_register_command(self): |
363 | + func = lambda a: None |
364 | + self.patch(self.zmq_plugin, '_send', self.called.append) |
365 | + self.zmq_plugin.register_command(func, "command") |
366 | + self.assertIn("irc.command", self.zmq_plugin._events) |
367 | + self.assertEquals(self.zmq_plugin._events["irc.command"][0][0], func) |
368 | + self.assertIn({'action':'register_command', |
369 | + 'command':["command"]}, self.called) |
370 | + |
371 | + def test_register_commands(self): |
372 | + func = lambda a: None |
373 | + self.patch(self.zmq_plugin, '_send', self.called.append) |
374 | + self.zmq_plugin.register_command(func, "command") |
375 | + self.assertIn("irc.command", self.zmq_plugin._events) |
376 | + self.assertEquals(self.zmq_plugin._events["irc.command"][0][0], func) |
377 | + self.assertIn({'action':'register_command', |
378 | + 'command':["command"]}, self.called) |
379 | + func1 = lambda a: None |
380 | + self.zmq_plugin.register_command(func1, "command1") |
381 | + self.assertEqual(len(self.zmq_plugin._events["irc.command"]), 2) |
382 | + self.assertEquals(self.zmq_plugin._events["irc.command"][1][0], func1) |
383 | + self.assertIn({'action':'register_command', |
384 | + 'command':["command1"]}, self.called) |
385 | + |
386 | + |
387 | + |
388 | + def test_say(self): |
389 | + self.patch(self.zmq_plugin, '_send', self.called.append) |
390 | + self.zmq_plugin.say("me", "message") |
391 | + self.assertIn({'action':'say', 'to_whom':'me', 'msg':"message", |
392 | + 'args':()}, self.called) |
393 | |
394 | === added directory 'lalita/plugins/zmq_plugins' |
395 | === added file 'lalita/plugins/zmq_plugins/example.py' |
396 | --- lalita/plugins/zmq_plugins/example.py 1970-01-01 00:00:00 +0000 |
397 | +++ lalita/plugins/zmq_plugins/example.py 2012-07-09 13:48:35 +0000 |
398 | @@ -0,0 +1,52 @@ |
399 | +# -*- coding: utf8 -*- |
400 | +# Copyright 2009-2012 laliputienses |
401 | +# License: GPL v3 |
402 | +# For further info, see LICENSE file |
403 | + |
404 | +from lalita.plugins.zmq_proxy import PluginProcess |
405 | + |
406 | + |
407 | +class Example(PluginProcess): |
408 | + """Example zmq-based plugin.""" |
409 | + |
410 | + def init(self, config): |
411 | + self.logger.info("Configuring Example Plugin!") |
412 | + # register the commands |
413 | + self.register_command(self.cmd_example, "example") |
414 | + self.register_command(self.cmd_example1, "example1") |
415 | + self.register("irc.private_message", self.example_priv) |
416 | + self.register("irc.talked_to_me", self.cmd_example) |
417 | + |
418 | + def example_priv(self, user, command, *args): |
419 | + """Just say something.""" |
420 | + self.say(user, "This is an example plugin.") |
421 | + |
422 | + def cmd_example(self, user, channel, command, *args): |
423 | + """Just say something.""" |
424 | + self.logger.debug("command %s from %s (args: %s)", command, user, args) |
425 | + self.say(channel, "This is an example plugin.") |
426 | + |
427 | + def cmd_example1(self, user, channel, command, *args): |
428 | + """Just say something.""" |
429 | + self.logger.debug("command %s from %s (args: %s)", command, user, args) |
430 | + self.say(channel, "Another example.") |
431 | + |
432 | + |
433 | +if __name__ == "__main__": |
434 | + import optparse |
435 | + parser = optparse.OptionParser() |
436 | + parser.add_option("-s", "--events-address", dest="events_address", |
437 | + default="tcp://127.0.0.1:9090") |
438 | + parser.add_option("-b", "--bot-address", dest="bot_address", |
439 | + default="tcp://127.0.0.1:9091") |
440 | + options, args = parser.parse_args() |
441 | + import logging |
442 | + logging.basicConfig() |
443 | + logging.getLogger().setLevel(logging.DEBUG) |
444 | + |
445 | + try: |
446 | + Example(options.events_address, options.bot_address).run() |
447 | + except: |
448 | + import traceback; |
449 | + traceback.print_exc() |
450 | + |
451 | |
452 | === added file 'lalita/plugins/zmq_proxy.py' |
453 | --- lalita/plugins/zmq_proxy.py 1970-01-01 00:00:00 +0000 |
454 | +++ lalita/plugins/zmq_proxy.py 2012-07-09 13:48:35 +0000 |
455 | @@ -0,0 +1,210 @@ |
456 | +# |
457 | +# Copyright 2009-2012 laliputienses |
458 | +# License: GPL v3 |
459 | +# For further info, see LICENSE file |
460 | +# |
461 | +"""ZeroMQ proxy plugin.""" |
462 | + |
463 | +import json |
464 | +import logging |
465 | +import re |
466 | + |
467 | +import zmq |
468 | + |
469 | +from txzmq.pubsub import ZmqSubConnection |
470 | +from txzmq import ZmqFactory, ZmqEndpoint |
471 | + |
472 | +from lalita import Plugin |
473 | +from lalita.core import events |
474 | + |
475 | + |
476 | +EVENT_MAP = { |
477 | + events.CONNECTION_MADE:("irc.connection_made", None, None), |
478 | + events.CONNECTION_LOST:("irc.connection_lost", None, None), |
479 | + events.SIGNED_ON:("irc.signed_on", None, None), |
480 | + events.JOINED:("irc.joined", 0, None), |
481 | + events.PRIVATE_MESSAGE:("irc.private_message", None, 0), |
482 | + events.TALKED_TO_ME:("irc.talked_to_me", 1, 0), |
483 | + events.COMMAND:("irc.command", 1, 0), |
484 | + events.PUBLIC_MESSAGE:("irc.public_message", 1, 0), |
485 | + events.ACTION:("irc.action", 1, 0), |
486 | + events.JOIN:("irc.join", 1, 0), |
487 | + events.LEFT:("irc.left", 1, 0), |
488 | + events.QUIT:("irc.quit", None, 0), |
489 | + events.KICK:("irc.kick", 1, 0), |
490 | + events.RENAME:("irc.rename", None, 0), |
491 | +} |
492 | + |
493 | + |
494 | +class BotConnection(ZmqSubConnection): |
495 | + """Bot zmq connection.""" |
496 | + |
497 | + def __init__(self, plugin, factory, endpoint): |
498 | + ZmqSubConnection.__init__(self, factory, endpoint) |
499 | + self.plugin = plugin |
500 | + |
501 | + def messageReceived(self, *a, **kw): |
502 | + return ZmqSubConnection.messageReceived(self, *a, **kw) |
503 | + |
504 | + def gotMessage(self, message): |
505 | + info = json.loads(message) |
506 | + if info['action'] == "say": |
507 | + if isinstance(info['to_whom'], unicode): |
508 | + self.plugin.say(info['to_whom'].encode('utf-8'), |
509 | + info['msg'], *info['args']) |
510 | + else: |
511 | + self.plugin.say(info['to_whom'], info['msg'], *info['args']) |
512 | + elif info["action"] == "register_command": |
513 | + self.plugin.register_command(info['command']) |
514 | + else: |
515 | + self.plugin.log.error("Invalid Action %s", message) |
516 | + |
517 | + |
518 | +class EventHandler(object): |
519 | + """Bridge of lalita event's to zmq messages.""" |
520 | + |
521 | + def __init__(self, plugin, event_name, channel_pos, user_pos): |
522 | + self.name = event_name |
523 | + self.channel_pos = channel_pos |
524 | + self.user_pos = user_pos |
525 | + self.plugin = plugin |
526 | + self.im_self = plugin |
527 | + self.im_func = self.__call__ |
528 | + self.logger = plugin.logger |
529 | + |
530 | + def __call__(self, *args): |
531 | + self.logger.debug("event: %s", args) |
532 | + msg_args = {} |
533 | + if self.channel_pos is not None: |
534 | + msg_args['channel'] = args[self.channel_pos] |
535 | + if self.user_pos is not None: |
536 | + msg_args['user'] = args[self.user_pos] |
537 | + msg_args['args'] = [a for i, a in enumerate(args) \ |
538 | + if i != self.channel_pos and i != self.user_pos] |
539 | + self.logger.debug("publishing: %s - %s", self.name, msg_args) |
540 | + self.plugin.publish(self.name, msg_args) |
541 | + |
542 | + |
543 | +class ZMQPlugin(Plugin): |
544 | + """ZeroMQ plugin.""" |
545 | + |
546 | + def init(self, config): |
547 | + self._plugins = {} |
548 | + self.config = config |
549 | + pub_address = self.config['events_address'] |
550 | + cmd_address = self.config['bot_address'] |
551 | + self.commands = set() # hold all the commands |
552 | + self.ctx = zmq.Context.instance() |
553 | + self.pub_socket = self.ctx.socket(zmq.PUB) |
554 | + self.pub_socket.bind(pub_address) |
555 | + # callback/command socket |
556 | + zmq_factory = ZmqFactory(context=self.ctx) |
557 | + rpc_endpoint = ZmqEndpoint("bind", cmd_address) |
558 | + self.cmd_socket = BotConnection(self, zmq_factory, rpc_endpoint) |
559 | + self.cmd_socket.subscribe("") |
560 | + for event, info in EVENT_MAP.items(): |
561 | + if event != events.COMMAND: |
562 | + self.register(event, EventHandler(self, *info), re.compile(".*")) |
563 | + |
564 | + def shutdown(self): |
565 | + if hasattr(self, 'cmd_socket'): |
566 | + self.cmd_socket.shutdown() |
567 | + if hasattr(self, 'pub_socket'): |
568 | + self.pub_socket.close() |
569 | + self.ctx.term() |
570 | + for proc in self._plugins.values(): |
571 | + proc.loseConnection() |
572 | + |
573 | + def register_command(self, commands): |
574 | + """Register a list of commands.""" |
575 | + if not self.commands.intersection(set(commands)): |
576 | + for command in commands: |
577 | + self.commands.add(command) |
578 | + self.register(events.COMMAND, |
579 | + EventHandler(self, *EVENT_MAP[events.COMMAND]), |
580 | + commands) |
581 | + # else, already registered. |
582 | + |
583 | + def publish(self, name, msg_args): |
584 | + """Publish a message/event.""" |
585 | + self.pub_socket.send(name, zmq.SNDMORE) |
586 | + self.pub_socket.send(json.dumps(msg_args)) |
587 | + |
588 | + |
589 | +class PluginProcess(object): |
590 | + """Base class for ZeroMQ plugins.""" |
591 | + |
592 | + def __init__(self, events_address, bot_address, config=None): |
593 | + self.ctx = None |
594 | + self.sub_socket = None |
595 | + self.cmd_socket = None |
596 | + self.config = config or {} |
597 | + self.logger = logging.getLogger("zmq_plugin.%s" % |
598 | + (self.__class__.__name__,)) |
599 | + self._connect(events_address, bot_address) |
600 | + # setup the event handler |
601 | + self._events = {} |
602 | + self.init(config) |
603 | + |
604 | + def _connect(self, events_address, bot_address): |
605 | + self.ctx = zmq.Context.instance() |
606 | + self.sub_socket = self.ctx.socket(zmq.SUB) |
607 | + self.sub_socket.connect(events_address) |
608 | + self.sub_socket.setsockopt(zmq.SUBSCRIBE, "irc") |
609 | + # create the bot socket |
610 | + self.bot_socket = self.ctx.socket(zmq.PUB) |
611 | + self.bot_socket.connect(bot_address) |
612 | + |
613 | + def init(self, config): |
614 | + """Subclass responsability.""" |
615 | + pass |
616 | + |
617 | + def _send(self, info): |
618 | + self.bot_socket.send(json.dumps(info)) |
619 | + |
620 | + def register_command(self, function, command_name): |
621 | + """Register a command.""" |
622 | + self._events.setdefault('irc.command', [])\ |
623 | + .append((function, lambda a: command_name in a)) |
624 | + new_msg = {'action':'register_command', 'command':[command_name]} |
625 | + self._send(new_msg) |
626 | + |
627 | + def register(self, event, function, matcher=None): |
628 | + """Register a event handler.""" |
629 | + self._events[event] = (function, matcher) |
630 | + |
631 | + def say(self, to_whom, msg, *args): |
632 | + """Say something.""" |
633 | + new_msg = {'action':'say', 'to_whom':to_whom, "msg":msg, "args":args} |
634 | + self._send(new_msg) |
635 | + |
636 | + def run(self): |
637 | + """Main loop""" |
638 | + while True: |
639 | + # block waiting for a message |
640 | + match = False |
641 | + event = self.sub_socket.recv() |
642 | + payload = json.loads(self.sub_socket.recv()) |
643 | + if event == "irc.command": |
644 | + commands = self._events[event] |
645 | + for handler, matcher in commands: |
646 | + if matcher(payload['args']): |
647 | + match = True |
648 | + break |
649 | + else: |
650 | + match = True |
651 | + try: |
652 | + handler, matcher = self._events[event] |
653 | + |
654 | + except KeyError: |
655 | + self.logger.error("No handler for %s", event) |
656 | + continue |
657 | + if match: |
658 | + user = payload.get('user') |
659 | + channel = payload.get('channel') |
660 | + args = [a for a in [user, channel] + payload['args'] \ |
661 | + if a is not None] |
662 | + handler(*args) |
663 | + else: |
664 | + self.logger.debug("No match for %s", payload) |
665 | + |
added info in the docs/*