Merge lp:~verterok/lalita/zmq-proxy into lp:lalita

Proposed by Guillermo Gonzalez
Status: Merged
Approved by: Facundo Batista
Approved revision: 179
Merged at revision: 173
Proposed branch: lp:~verterok/lalita/zmq-proxy
Merge into: lp:lalita
Diff against target: 665 lines (+497/-9)
11 files modified
docs/tutorial_en.rst (+5/-0)
docs/tutorial_sp.rst (+4/-0)
lalita.cfg.sample (+4/-0)
lalita/core/tests/test_dispatcher.py (+3/-2)
lalita/core/tests/test_events.py (+2/-1)
lalita/core/tests/test_ircbot.py (+3/-3)
lalita/core/tests/test_metacommands.py (+2/-1)
lalita/plugins/tests/helper.py (+8/-2)
lalita/plugins/tests/test_zmq_proxy.py (+204/-0)
lalita/plugins/zmq_plugins/example.py (+52/-0)
lalita/plugins/zmq_proxy.py (+210/-0)
To merge this branch: bzr merge lp:~verterok/lalita/zmq-proxy
Reviewer Review Type Date Requested Status
Facundo Batista Approve
Review via email: mp+113858@code.launchpad.net

Commit message

ZeroMQ proxy plugin, converts lalita into a irc <-> ZeroMQ bridge (to create plugins in external processes)

Description of the change

Add a plugin that converts lalita into a irc <-> ZeroMQ bridge, this allow to create plugins in external processes.

To post a comment you must log in.
lp:~verterok/lalita/zmq-proxy updated
178. By Guillermo Gonzalez

add info about zmq-proxy plugin in the docs.

Revision history for this message
Guillermo Gonzalez (verterok) wrote :

added info in the docs/*

lp:~verterok/lalita/zmq-proxy updated
179. By Guillermo Gonzalez

fix typo

Revision history for this message
Facundo Batista (facundo) wrote :

Awesome, thanks!

review: Approve
lp:~verterok/lalita/zmq-proxy updated
180. By Guillermo Gonzalez

add copyright notice

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'docs/tutorial_en.rst'
2--- docs/tutorial_en.rst 2011-04-11 02:06:18 +0000
3+++ docs/tutorial_en.rst 2012-07-09 13:48:35 +0000
4@@ -633,6 +633,11 @@
5 - url.py: Recollects all the URLs that are mentioned on the different channels,
6 and you can search them afterwards.
7
8+- zmq_proxy.py: It's a IRC-ZeroMQ <http://www.zeromq.org/> proxy/bridge, that
9+ publish all lalita events to a PUB/SUB ZeroMQ socket and listen for commands
10+ in other socket (in json format). See zmq_plugins/example.py for an example
11+ plugin using this.
12+
13
14 Advanced configuration
15 ======================
16
17=== modified file 'docs/tutorial_sp.rst'
18--- docs/tutorial_sp.rst 2011-04-11 02:06:18 +0000
19+++ docs/tutorial_sp.rst 2012-07-09 13:48:35 +0000
20@@ -670,6 +670,10 @@
21 - url.py: Va juntando todas las URLs que se van mencionando en los
22 distintos canales, y luego nos permite buscar en las mismas.
23
24+- zmq_proxy.py: Es un proxy/bridge entre IRC y ZeroMQ <http://www.zeromq.org/>
25+ que publica todos los eventos de lalita en un socket pub/sub de ZeroMQ y
26+ escucha 'comandos' en otro socket (en formato json).
27+ Ver zmq_plugins/example.py para un ejemplo de plugin en otro proceso.
28
29 ConfiguraciĆ³n avanzada
30 ======================
31
32=== modified file 'lalita.cfg.sample'
33--- lalita.cfg.sample 2011-04-11 02:05:31 +0000
34+++ lalita.cfg.sample 2012-07-09 13:48:35 +0000
35@@ -55,6 +55,10 @@
36 # 'url.Url': { },
37 'lalita.plugins.photo.Photo': { },
38 'lalita.plugins.misc.Ping': { },
39+ 'zmq_proxy.ZMQPlugin':{
40+ 'events_address':'tcp://127.0.0.1:9090',
41+ 'bot_address':'tcp://127.0.0.1:9091',
42+ },
43 },
44 ),
45 'example': dict (
46
47=== modified file 'lalita/core/tests/test_dispatcher.py'
48--- lalita/core/tests/test_dispatcher.py 2010-06-22 00:12:42 +0000
49+++ lalita/core/tests/test_dispatcher.py 2012-07-09 13:48:35 +0000
50@@ -2,6 +2,7 @@
51 # License: GPL v3
52 # For further info, see LICENSE file
53
54+import logging
55 import re
56
57 from collections import defaultdict
58@@ -26,7 +27,7 @@
59 )
60
61 ircbot_factory = ircbot.IRCBotFactory(server)
62- ircbot.logger.setLevel("error")
63+ ircbot.logger.setLevel(logging.ERROR)
64 self.bot = ircbot.IrcBot()
65 self.bot.factory = ircbot_factory
66 self.bot.config = ircbot_factory.config
67@@ -213,7 +214,7 @@
68 if name == name.upper()]
69 for event in supported_events:
70 self.disp.register(event, self.helper.f)
71- if (event in dispatcher.USER_POS and
72+ if (event in dispatcher.USER_POS and
73 dispatcher.USER_POS[event] is not None):
74 self.disp.push(event, 'user', 'channel', 'msg')
75 elif (event in dispatcher.CHANNEL_POS and
76
77=== modified file 'lalita/core/tests/test_events.py'
78--- lalita/core/tests/test_events.py 2010-06-20 16:34:35 +0000
79+++ lalita/core/tests/test_events.py 2012-07-09 13:48:35 +0000
80@@ -2,6 +2,7 @@
81 # License: GPL v3
82 # For further info, see LICENSE file
83
84+import logging
85
86 from twisted.trial.unittest import TestCase as TwistedTestCase
87 from twisted.internet import defer
88@@ -21,7 +22,7 @@
89 def write(*a): pass
90
91 ircbot_factory = ircbot.IRCBotFactory(server)
92-ircbot.logger.setLevel("error")
93+ircbot.logger.setLevel(logging.ERROR)
94 bot = ircbot.IrcBot()
95 bot.factory = ircbot_factory
96 bot.msg = lambda *a:None
97
98=== modified file 'lalita/core/tests/test_ircbot.py'
99--- lalita/core/tests/test_ircbot.py 2010-06-16 11:45:13 +0000
100+++ lalita/core/tests/test_ircbot.py 2012-07-09 13:48:35 +0000
101@@ -47,7 +47,7 @@
102 def setUp(self):
103 server["log_config"] = {}
104 ircbot_factory = ircbot.IRCBotFactory(server)
105- ircbot.logger.setLevel("error")
106+ ircbot.logger.setLevel(logging.ERROR)
107 self.bot = bot = ircbot.IrcBot()
108 bot.factory = ircbot_factory
109 bot.config = ircbot_factory.config
110@@ -129,7 +129,7 @@
111
112 def setUp(self):
113 ircbot_factory = ircbot.IRCBotFactory(server)
114- ircbot.logger.setLevel("error")
115+ ircbot.logger.setLevel(logging.ERROR)
116 self.bot = bot = ircbot.IrcBot()
117 bot.factory = ircbot_factory
118 bot.config = ircbot_factory.config
119@@ -168,7 +168,7 @@
120
121 def setUp(self):
122 ircbot_factory = ircbot.IRCBotFactory(server.copy())
123- ircbot.logger.setLevel("error")
124+ ircbot.logger.setLevel(logging.ERROR)
125 self.bot = bot = ircbot.IrcBot()
126 bot.factory = ircbot_factory
127 bot.config = ircbot_factory.config
128
129=== modified file 'lalita/core/tests/test_metacommands.py'
130--- lalita/core/tests/test_metacommands.py 2010-01-17 19:03:28 +0000
131+++ lalita/core/tests/test_metacommands.py 2012-07-09 13:48:35 +0000
132@@ -4,6 +4,7 @@
133 # License: GPL v3
134 # For further info, see LICENSE file
135
136+import logging
137 import unittest
138
139 from collections import defaultdict
140@@ -11,7 +12,7 @@
141 from lalita import dispatcher, events, ircbot
142
143 ircbot_factory = ircbot.IRCBotFactory(dict(log_config="error", channels=defaultdict(lambda: {})))
144-ircbot.logger.setLevel("error")
145+ircbot.logger.setLevel(logging.ERROR)
146 bot = ircbot.IrcBot()
147 bot.factory = ircbot_factory
148 bot.config = ircbot_factory.config
149
150=== modified file 'lalita/plugins/tests/helper.py'
151--- lalita/plugins/tests/helper.py 2010-03-08 13:52:37 +0000
152+++ lalita/plugins/tests/helper.py 2012-07-09 13:48:35 +0000
153@@ -2,6 +2,7 @@
154 # License: GPL v3
155 # For further info, see LICENSE file
156
157+import logging
158 import unittest
159
160 from lalita import ircbot
161@@ -27,8 +28,8 @@
162 (plugin_name, config) = server_plugin
163 self.test_server["plugins"] = { plugin_name: config }
164
165- self.test_server["log_config"] = { plugin_name: "error" }
166- ircbot.logger.setLevel("error")
167+ self.test_server["log_config"] = { plugin_name: "ERROR" }
168+ ircbot.logger.setLevel(logging.ERROR)
169 ircbot_factory = ircbot.IRCBotFactory(self.test_server)
170 self.bot = ircbot.IrcBot()
171 self.bot.factory = ircbot_factory
172@@ -61,6 +62,11 @@
173 else:
174 raise ValueError("The searched plugin does not exist!")
175
176+ def tearDown(self):
177+ if hasattr(self, 'bot') and hasattr(self.bot, 'dispatcher'):
178+ self.bot.dispatcher.shutdown()
179+
180+
181 def assertMessageInAnswer(self, message_idx, expected):
182 """assert the content of message with index: message_idx in self.answer
183 and handle unexpected errors properly."""
184
185=== added file 'lalita/plugins/tests/test_zmq_proxy.py'
186--- lalita/plugins/tests/test_zmq_proxy.py 1970-01-01 00:00:00 +0000
187+++ lalita/plugins/tests/test_zmq_proxy.py 2012-07-09 13:48:35 +0000
188@@ -0,0 +1,204 @@
189+# -*- coding: utf8 -*-
190+
191+# Copyright 2010 laliputienses
192+# License: GPL v3
193+# For further info, see LICENSE file
194+import json
195+import time
196+
197+from twisted.trial.unittest import TestCase as TwistedTestCase
198+from twisted.internet import defer, reactor, error
199+
200+from lalita import events
201+from lalita.core.dispatcher import USER_POS, CHANNEL_POS
202+
203+from .helper import PluginTest
204+from lalita.plugins.zmq_proxy import EVENT_MAP, PluginProcess
205+
206+try:
207+ import zmq, txzmq
208+ zmq_available = True
209+except ImportError:
210+ zmq_available = False
211+
212+
213+class TestZMQPlugin(TwistedTestCase, PluginTest):
214+
215+ if not zmq_available:
216+ skip = "pyzmq and txzmq required."
217+
218+ def setUp(self):
219+ super(TestZMQPlugin, self).setUp()
220+ self.init(server_plugin=("lalita.plugins.zmq_proxy.ZMQPlugin",
221+ {"events_address":"inproc://pub_addr",
222+ "bot_address":"inproc://sub_addr"}))
223+ self.ctx = zmq.Context.instance()
224+ self.sub = self.ctx.socket(zmq.SUB)
225+ while True:
226+ try:
227+ self.sub.connect("inproc://pub_addr")
228+ except zmq.ZMQError:
229+ continue
230+ else:
231+ break
232+ self.sub.setsockopt(zmq.SUBSCRIBE, "")
233+ self.cmd = self.ctx.socket(zmq.PUB)
234+ while True:
235+ try:
236+ self.cmd.connect("inproc://sub_addr")
237+ except zmq.ZMQError:
238+ time.sleep(0.2)
239+ continue
240+ else:
241+ break
242+
243+ def tearDown(self):
244+ self.sub.close()
245+ self.cmd.close()
246+ self.plugin.shutdown()
247+ return super(TestZMQPlugin, self).tearDown()
248+
249+ @defer.inlineCallbacks
250+ def test_say_action(self):
251+ """Say something via zmq."""
252+ called = []
253+ self.patch(self.plugin, 'say', lambda *a: called.append(a))
254+ msg = {'action':'say', 'to_whom':"channel", "msg":"hola", "args":[]}
255+ self.cmd.send(json.dumps(msg))
256+ d = defer.Deferred()
257+ reactor.callLater(0.2, lambda: d.callback(None))
258+ yield d
259+ self.assertEqual(called[0], (msg['to_whom'].encode('utf-8'), msg['msg'].decode("utf-8")))
260+
261+ def test_event_handler(self):
262+ """Handle all events."""
263+ called = []
264+ self.patch(self.plugin, 'publish', lambda *a: called.append(a))
265+ tested_events = {
266+ events.CONNECTION_MADE:(),
267+ events.CONNECTION_LOST:(),
268+ events.SIGNED_ON:(),
269+ events.JOINED:("channel",),
270+ events.PRIVATE_MESSAGE:("channel",),
271+ events.TALKED_TO_ME:("channel", "user"),
272+ events.PUBLIC_MESSAGE:("channel", "user"),
273+ events.ACTION:("channel", "user"),
274+ events.JOIN:("channel", "user"),
275+ events.LEFT:("channel", "user"),
276+ events.QUIT:("user",),
277+ events.KICK:("channel", "user"),
278+ events.RENAME:("user",)
279+ }
280+ for event, fixed_args in tested_events.items():
281+ msg = "a message"
282+ args = fixed_args + (msg,)
283+ self.disp.push(event, *args)
284+ kwargs = {}
285+ if CHANNEL_POS[event] is not None:
286+ kwargs['channel'] = args[CHANNEL_POS[event]]
287+ if USER_POS[event] is not None:
288+ kwargs['user'] = args[USER_POS[event]]
289+ kwargs['args'] = [msg]
290+ self.assertIn((EVENT_MAP[event][0], kwargs), called)
291+
292+
293+class TestPluginProcess(TwistedTestCase, PluginTest):
294+ """Tests for PluginProcess."""
295+
296+ class TestPlugin(PluginProcess):
297+ def __init__(self, addr1, addr2, called, config=None):
298+ self.called = called
299+ super(TestPluginProcess.TestPlugin, self).__init__(addr1, addr2,
300+ config=config)
301+
302+ def init(self, config):
303+ self.config = config
304+ self.called.append(("init", config))
305+
306+ def _connect(self, events_address, bot_address):
307+ self.ctx = zmq.Context.instance()
308+ self.sub_socket = self.ctx.socket(zmq.SUB)
309+ while True:
310+ try:
311+ self.sub_socket.connect(events_address)
312+ except zmq.ZMQError:
313+ time.sleep(0.1)
314+ continue
315+ else:
316+ break
317+ self.sub_socket.setsockopt(zmq.SUBSCRIBE, "")
318+ self.bot_socket = self.ctx.socket(zmq.PUB)
319+ while True:
320+ try:
321+ self.bot_socket.connect(bot_address)
322+ except zmq.ZMQError:
323+ time.sleep(0.1)
324+ continue
325+ else:
326+ break
327+
328+ def setUp(self):
329+ super(TestPluginProcess, self).setUp()
330+ events_address = "inproc://pub_addr"
331+ bot_address = "inproc://sub_addr"
332+ self.init(server_plugin=("lalita.plugins.zmq_proxy.ZMQPlugin",
333+ {"events_address":events_address,
334+ "bot_address":bot_address}))
335+ self.called = []
336+ try:
337+ self.zmq_plugin = TestPluginProcess.TestPlugin(events_address,
338+ bot_address,
339+ self.called)
340+ except Exception, e:
341+ import traceback;
342+ traceback.print_exc()
343+
344+ def tearDown(self):
345+ self.zmq_plugin.bot_socket.close()
346+ self.zmq_plugin.sub_socket.close()
347+ self.plugin.shutdown()
348+ return super(TestPluginProcess, self).tearDown()
349+
350+ def test_init(self):
351+ """Test init is called."""
352+ self.assertEquals(self.called, [("init", None)])
353+
354+ def test_register(self):
355+ """Register to en event."""
356+ matcher = lambda a: True
357+ func = lambda *a: True
358+ self.zmq_plugin.register(events.JOINED, func, matcher)
359+ self.assertIn(events.JOINED, self.zmq_plugin._events)
360+ self.assertEquals(self.zmq_plugin._events[events.JOINED], (func, matcher))
361+
362+ def test_register_command(self):
363+ func = lambda a: None
364+ self.patch(self.zmq_plugin, '_send', self.called.append)
365+ self.zmq_plugin.register_command(func, "command")
366+ self.assertIn("irc.command", self.zmq_plugin._events)
367+ self.assertEquals(self.zmq_plugin._events["irc.command"][0][0], func)
368+ self.assertIn({'action':'register_command',
369+ 'command':["command"]}, self.called)
370+
371+ def test_register_commands(self):
372+ func = lambda a: None
373+ self.patch(self.zmq_plugin, '_send', self.called.append)
374+ self.zmq_plugin.register_command(func, "command")
375+ self.assertIn("irc.command", self.zmq_plugin._events)
376+ self.assertEquals(self.zmq_plugin._events["irc.command"][0][0], func)
377+ self.assertIn({'action':'register_command',
378+ 'command':["command"]}, self.called)
379+ func1 = lambda a: None
380+ self.zmq_plugin.register_command(func1, "command1")
381+ self.assertEqual(len(self.zmq_plugin._events["irc.command"]), 2)
382+ self.assertEquals(self.zmq_plugin._events["irc.command"][1][0], func1)
383+ self.assertIn({'action':'register_command',
384+ 'command':["command1"]}, self.called)
385+
386+
387+
388+ def test_say(self):
389+ self.patch(self.zmq_plugin, '_send', self.called.append)
390+ self.zmq_plugin.say("me", "message")
391+ self.assertIn({'action':'say', 'to_whom':'me', 'msg':"message",
392+ 'args':()}, self.called)
393
394=== added directory 'lalita/plugins/zmq_plugins'
395=== added file 'lalita/plugins/zmq_plugins/example.py'
396--- lalita/plugins/zmq_plugins/example.py 1970-01-01 00:00:00 +0000
397+++ lalita/plugins/zmq_plugins/example.py 2012-07-09 13:48:35 +0000
398@@ -0,0 +1,52 @@
399+# -*- coding: utf8 -*-
400+# Copyright 2009-2012 laliputienses
401+# License: GPL v3
402+# For further info, see LICENSE file
403+
404+from lalita.plugins.zmq_proxy import PluginProcess
405+
406+
407+class Example(PluginProcess):
408+ """Example zmq-based plugin."""
409+
410+ def init(self, config):
411+ self.logger.info("Configuring Example Plugin!")
412+ # register the commands
413+ self.register_command(self.cmd_example, "example")
414+ self.register_command(self.cmd_example1, "example1")
415+ self.register("irc.private_message", self.example_priv)
416+ self.register("irc.talked_to_me", self.cmd_example)
417+
418+ def example_priv(self, user, command, *args):
419+ """Just say something."""
420+ self.say(user, "This is an example plugin.")
421+
422+ def cmd_example(self, user, channel, command, *args):
423+ """Just say something."""
424+ self.logger.debug("command %s from %s (args: %s)", command, user, args)
425+ self.say(channel, "This is an example plugin.")
426+
427+ def cmd_example1(self, user, channel, command, *args):
428+ """Just say something."""
429+ self.logger.debug("command %s from %s (args: %s)", command, user, args)
430+ self.say(channel, "Another example.")
431+
432+
433+if __name__ == "__main__":
434+ import optparse
435+ parser = optparse.OptionParser()
436+ parser.add_option("-s", "--events-address", dest="events_address",
437+ default="tcp://127.0.0.1:9090")
438+ parser.add_option("-b", "--bot-address", dest="bot_address",
439+ default="tcp://127.0.0.1:9091")
440+ options, args = parser.parse_args()
441+ import logging
442+ logging.basicConfig()
443+ logging.getLogger().setLevel(logging.DEBUG)
444+
445+ try:
446+ Example(options.events_address, options.bot_address).run()
447+ except:
448+ import traceback;
449+ traceback.print_exc()
450+
451
452=== added file 'lalita/plugins/zmq_proxy.py'
453--- lalita/plugins/zmq_proxy.py 1970-01-01 00:00:00 +0000
454+++ lalita/plugins/zmq_proxy.py 2012-07-09 13:48:35 +0000
455@@ -0,0 +1,210 @@
456+#
457+# Copyright 2009-2012 laliputienses
458+# License: GPL v3
459+# For further info, see LICENSE file
460+#
461+"""ZeroMQ proxy plugin."""
462+
463+import json
464+import logging
465+import re
466+
467+import zmq
468+
469+from txzmq.pubsub import ZmqSubConnection
470+from txzmq import ZmqFactory, ZmqEndpoint
471+
472+from lalita import Plugin
473+from lalita.core import events
474+
475+
476+EVENT_MAP = {
477+ events.CONNECTION_MADE:("irc.connection_made", None, None),
478+ events.CONNECTION_LOST:("irc.connection_lost", None, None),
479+ events.SIGNED_ON:("irc.signed_on", None, None),
480+ events.JOINED:("irc.joined", 0, None),
481+ events.PRIVATE_MESSAGE:("irc.private_message", None, 0),
482+ events.TALKED_TO_ME:("irc.talked_to_me", 1, 0),
483+ events.COMMAND:("irc.command", 1, 0),
484+ events.PUBLIC_MESSAGE:("irc.public_message", 1, 0),
485+ events.ACTION:("irc.action", 1, 0),
486+ events.JOIN:("irc.join", 1, 0),
487+ events.LEFT:("irc.left", 1, 0),
488+ events.QUIT:("irc.quit", None, 0),
489+ events.KICK:("irc.kick", 1, 0),
490+ events.RENAME:("irc.rename", None, 0),
491+}
492+
493+
494+class BotConnection(ZmqSubConnection):
495+ """Bot zmq connection."""
496+
497+ def __init__(self, plugin, factory, endpoint):
498+ ZmqSubConnection.__init__(self, factory, endpoint)
499+ self.plugin = plugin
500+
501+ def messageReceived(self, *a, **kw):
502+ return ZmqSubConnection.messageReceived(self, *a, **kw)
503+
504+ def gotMessage(self, message):
505+ info = json.loads(message)
506+ if info['action'] == "say":
507+ if isinstance(info['to_whom'], unicode):
508+ self.plugin.say(info['to_whom'].encode('utf-8'),
509+ info['msg'], *info['args'])
510+ else:
511+ self.plugin.say(info['to_whom'], info['msg'], *info['args'])
512+ elif info["action"] == "register_command":
513+ self.plugin.register_command(info['command'])
514+ else:
515+ self.plugin.log.error("Invalid Action %s", message)
516+
517+
518+class EventHandler(object):
519+ """Bridge of lalita event's to zmq messages."""
520+
521+ def __init__(self, plugin, event_name, channel_pos, user_pos):
522+ self.name = event_name
523+ self.channel_pos = channel_pos
524+ self.user_pos = user_pos
525+ self.plugin = plugin
526+ self.im_self = plugin
527+ self.im_func = self.__call__
528+ self.logger = plugin.logger
529+
530+ def __call__(self, *args):
531+ self.logger.debug("event: %s", args)
532+ msg_args = {}
533+ if self.channel_pos is not None:
534+ msg_args['channel'] = args[self.channel_pos]
535+ if self.user_pos is not None:
536+ msg_args['user'] = args[self.user_pos]
537+ msg_args['args'] = [a for i, a in enumerate(args) \
538+ if i != self.channel_pos and i != self.user_pos]
539+ self.logger.debug("publishing: %s - %s", self.name, msg_args)
540+ self.plugin.publish(self.name, msg_args)
541+
542+
543+class ZMQPlugin(Plugin):
544+ """ZeroMQ plugin."""
545+
546+ def init(self, config):
547+ self._plugins = {}
548+ self.config = config
549+ pub_address = self.config['events_address']
550+ cmd_address = self.config['bot_address']
551+ self.commands = set() # hold all the commands
552+ self.ctx = zmq.Context.instance()
553+ self.pub_socket = self.ctx.socket(zmq.PUB)
554+ self.pub_socket.bind(pub_address)
555+ # callback/command socket
556+ zmq_factory = ZmqFactory(context=self.ctx)
557+ rpc_endpoint = ZmqEndpoint("bind", cmd_address)
558+ self.cmd_socket = BotConnection(self, zmq_factory, rpc_endpoint)
559+ self.cmd_socket.subscribe("")
560+ for event, info in EVENT_MAP.items():
561+ if event != events.COMMAND:
562+ self.register(event, EventHandler(self, *info), re.compile(".*"))
563+
564+ def shutdown(self):
565+ if hasattr(self, 'cmd_socket'):
566+ self.cmd_socket.shutdown()
567+ if hasattr(self, 'pub_socket'):
568+ self.pub_socket.close()
569+ self.ctx.term()
570+ for proc in self._plugins.values():
571+ proc.loseConnection()
572+
573+ def register_command(self, commands):
574+ """Register a list of commands."""
575+ if not self.commands.intersection(set(commands)):
576+ for command in commands:
577+ self.commands.add(command)
578+ self.register(events.COMMAND,
579+ EventHandler(self, *EVENT_MAP[events.COMMAND]),
580+ commands)
581+ # else, already registered.
582+
583+ def publish(self, name, msg_args):
584+ """Publish a message/event."""
585+ self.pub_socket.send(name, zmq.SNDMORE)
586+ self.pub_socket.send(json.dumps(msg_args))
587+
588+
589+class PluginProcess(object):
590+ """Base class for ZeroMQ plugins."""
591+
592+ def __init__(self, events_address, bot_address, config=None):
593+ self.ctx = None
594+ self.sub_socket = None
595+ self.cmd_socket = None
596+ self.config = config or {}
597+ self.logger = logging.getLogger("zmq_plugin.%s" %
598+ (self.__class__.__name__,))
599+ self._connect(events_address, bot_address)
600+ # setup the event handler
601+ self._events = {}
602+ self.init(config)
603+
604+ def _connect(self, events_address, bot_address):
605+ self.ctx = zmq.Context.instance()
606+ self.sub_socket = self.ctx.socket(zmq.SUB)
607+ self.sub_socket.connect(events_address)
608+ self.sub_socket.setsockopt(zmq.SUBSCRIBE, "irc")
609+ # create the bot socket
610+ self.bot_socket = self.ctx.socket(zmq.PUB)
611+ self.bot_socket.connect(bot_address)
612+
613+ def init(self, config):
614+ """Subclass responsability."""
615+ pass
616+
617+ def _send(self, info):
618+ self.bot_socket.send(json.dumps(info))
619+
620+ def register_command(self, function, command_name):
621+ """Register a command."""
622+ self._events.setdefault('irc.command', [])\
623+ .append((function, lambda a: command_name in a))
624+ new_msg = {'action':'register_command', 'command':[command_name]}
625+ self._send(new_msg)
626+
627+ def register(self, event, function, matcher=None):
628+ """Register a event handler."""
629+ self._events[event] = (function, matcher)
630+
631+ def say(self, to_whom, msg, *args):
632+ """Say something."""
633+ new_msg = {'action':'say', 'to_whom':to_whom, "msg":msg, "args":args}
634+ self._send(new_msg)
635+
636+ def run(self):
637+ """Main loop"""
638+ while True:
639+ # block waiting for a message
640+ match = False
641+ event = self.sub_socket.recv()
642+ payload = json.loads(self.sub_socket.recv())
643+ if event == "irc.command":
644+ commands = self._events[event]
645+ for handler, matcher in commands:
646+ if matcher(payload['args']):
647+ match = True
648+ break
649+ else:
650+ match = True
651+ try:
652+ handler, matcher = self._events[event]
653+
654+ except KeyError:
655+ self.logger.error("No handler for %s", event)
656+ continue
657+ if match:
658+ user = payload.get('user')
659+ channel = payload.get('channel')
660+ args = [a for a in [user, channel] + payload['args'] \
661+ if a is not None]
662+ handler(*args)
663+ else:
664+ self.logger.debug("No match for %s", payload)
665+

Subscribers

People subscribed via source and target branches