Merge lp:~verterok/lalita/zmq-proxy into lp:lalita

Proposed by Guillermo Gonzalez
Status: Merged
Approved by: Facundo Batista
Approved revision: 179
Merged at revision: 173
Proposed branch: lp:~verterok/lalita/zmq-proxy
Merge into: lp:lalita
Diff against target: 665 lines (+497/-9)
11 files modified
docs/tutorial_en.rst (+5/-0)
docs/tutorial_sp.rst (+4/-0)
lalita.cfg.sample (+4/-0)
lalita/core/tests/test_dispatcher.py (+3/-2)
lalita/core/tests/test_events.py (+2/-1)
lalita/core/tests/test_ircbot.py (+3/-3)
lalita/core/tests/test_metacommands.py (+2/-1)
lalita/plugins/tests/helper.py (+8/-2)
lalita/plugins/tests/test_zmq_proxy.py (+204/-0)
lalita/plugins/zmq_plugins/example.py (+52/-0)
lalita/plugins/zmq_proxy.py (+210/-0)
To merge this branch: bzr merge lp:~verterok/lalita/zmq-proxy
Reviewer Review Type Date Requested Status
Facundo Batista Approve
Review via email: mp+113858@code.launchpad.net

Commit message

ZeroMQ proxy plugin, converts lalita into a irc <-> ZeroMQ bridge (to create plugins in external processes)

Description of the change

Add a plugin that converts lalita into a irc <-> ZeroMQ bridge, this allow to create plugins in external processes.

To post a comment you must log in.
lp:~verterok/lalita/zmq-proxy updated
178. By Guillermo Gonzalez

add info about zmq-proxy plugin in the docs.

Revision history for this message
Guillermo Gonzalez (verterok) wrote :

added info in the docs/*

lp:~verterok/lalita/zmq-proxy updated
179. By Guillermo Gonzalez

fix typo

Revision history for this message
Facundo Batista (facundo) wrote :

Awesome, thanks!

review: Approve
lp:~verterok/lalita/zmq-proxy updated
180. By Guillermo Gonzalez

add copyright notice

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'docs/tutorial_en.rst'
--- docs/tutorial_en.rst 2011-04-11 02:06:18 +0000
+++ docs/tutorial_en.rst 2012-07-09 13:48:35 +0000
@@ -633,6 +633,11 @@
633- url.py: Recollects all the URLs that are mentioned on the different channels,633- url.py: Recollects all the URLs that are mentioned on the different channels,
634 and you can search them afterwards.634 and you can search them afterwards.
635635
636- zmq_proxy.py: It's a IRC-ZeroMQ <http://www.zeromq.org/> proxy/bridge, that
637 publish all lalita events to a PUB/SUB ZeroMQ socket and listen for commands
638 in other socket (in json format). See zmq_plugins/example.py for an example
639 plugin using this.
640
636641
637Advanced configuration642Advanced configuration
638======================643======================
639644
=== modified file 'docs/tutorial_sp.rst'
--- docs/tutorial_sp.rst 2011-04-11 02:06:18 +0000
+++ docs/tutorial_sp.rst 2012-07-09 13:48:35 +0000
@@ -670,6 +670,10 @@
670- url.py: Va juntando todas las URLs que se van mencionando en los670- url.py: Va juntando todas las URLs que se van mencionando en los
671 distintos canales, y luego nos permite buscar en las mismas.671 distintos canales, y luego nos permite buscar en las mismas.
672672
673- zmq_proxy.py: Es un proxy/bridge entre IRC y ZeroMQ <http://www.zeromq.org/>
674 que publica todos los eventos de lalita en un socket pub/sub de ZeroMQ y
675 escucha 'comandos' en otro socket (en formato json).
676 Ver zmq_plugins/example.py para un ejemplo de plugin en otro proceso.
673677
674ConfiguraciĆ³n avanzada678ConfiguraciĆ³n avanzada
675======================679======================
676680
=== modified file 'lalita.cfg.sample'
--- lalita.cfg.sample 2011-04-11 02:05:31 +0000
+++ lalita.cfg.sample 2012-07-09 13:48:35 +0000
@@ -55,6 +55,10 @@
55 # 'url.Url': { },55 # 'url.Url': { },
56 'lalita.plugins.photo.Photo': { },56 'lalita.plugins.photo.Photo': { },
57 'lalita.plugins.misc.Ping': { },57 'lalita.plugins.misc.Ping': { },
58 'zmq_proxy.ZMQPlugin':{
59 'events_address':'tcp://127.0.0.1:9090',
60 'bot_address':'tcp://127.0.0.1:9091',
61 },
58 },62 },
59 ),63 ),
60 'example': dict (64 'example': dict (
6165
=== modified file 'lalita/core/tests/test_dispatcher.py'
--- lalita/core/tests/test_dispatcher.py 2010-06-22 00:12:42 +0000
+++ lalita/core/tests/test_dispatcher.py 2012-07-09 13:48:35 +0000
@@ -2,6 +2,7 @@
2# License: GPL v32# License: GPL v3
3# For further info, see LICENSE file3# For further info, see LICENSE file
44
5import logging
5import re6import re
67
7from collections import defaultdict8from collections import defaultdict
@@ -26,7 +27,7 @@
26 )27 )
2728
28 ircbot_factory = ircbot.IRCBotFactory(server)29 ircbot_factory = ircbot.IRCBotFactory(server)
29 ircbot.logger.setLevel("error")30 ircbot.logger.setLevel(logging.ERROR)
30 self.bot = ircbot.IrcBot()31 self.bot = ircbot.IrcBot()
31 self.bot.factory = ircbot_factory32 self.bot.factory = ircbot_factory
32 self.bot.config = ircbot_factory.config33 self.bot.config = ircbot_factory.config
@@ -213,7 +214,7 @@
213 if name == name.upper()]214 if name == name.upper()]
214 for event in supported_events:215 for event in supported_events:
215 self.disp.register(event, self.helper.f)216 self.disp.register(event, self.helper.f)
216 if (event in dispatcher.USER_POS and 217 if (event in dispatcher.USER_POS and
217 dispatcher.USER_POS[event] is not None):218 dispatcher.USER_POS[event] is not None):
218 self.disp.push(event, 'user', 'channel', 'msg')219 self.disp.push(event, 'user', 'channel', 'msg')
219 elif (event in dispatcher.CHANNEL_POS and220 elif (event in dispatcher.CHANNEL_POS and
220221
=== modified file 'lalita/core/tests/test_events.py'
--- lalita/core/tests/test_events.py 2010-06-20 16:34:35 +0000
+++ lalita/core/tests/test_events.py 2012-07-09 13:48:35 +0000
@@ -2,6 +2,7 @@
2# License: GPL v32# License: GPL v3
3# For further info, see LICENSE file3# For further info, see LICENSE file
44
5import logging
56
6from twisted.trial.unittest import TestCase as TwistedTestCase7from twisted.trial.unittest import TestCase as TwistedTestCase
7from twisted.internet import defer8from twisted.internet import defer
@@ -21,7 +22,7 @@
21 def write(*a): pass22 def write(*a): pass
2223
23ircbot_factory = ircbot.IRCBotFactory(server)24ircbot_factory = ircbot.IRCBotFactory(server)
24ircbot.logger.setLevel("error")25ircbot.logger.setLevel(logging.ERROR)
25bot = ircbot.IrcBot()26bot = ircbot.IrcBot()
26bot.factory = ircbot_factory27bot.factory = ircbot_factory
27bot.msg = lambda *a:None28bot.msg = lambda *a:None
2829
=== modified file 'lalita/core/tests/test_ircbot.py'
--- lalita/core/tests/test_ircbot.py 2010-06-16 11:45:13 +0000
+++ lalita/core/tests/test_ircbot.py 2012-07-09 13:48:35 +0000
@@ -47,7 +47,7 @@
47 def setUp(self):47 def setUp(self):
48 server["log_config"] = {}48 server["log_config"] = {}
49 ircbot_factory = ircbot.IRCBotFactory(server)49 ircbot_factory = ircbot.IRCBotFactory(server)
50 ircbot.logger.setLevel("error")50 ircbot.logger.setLevel(logging.ERROR)
51 self.bot = bot = ircbot.IrcBot()51 self.bot = bot = ircbot.IrcBot()
52 bot.factory = ircbot_factory52 bot.factory = ircbot_factory
53 bot.config = ircbot_factory.config53 bot.config = ircbot_factory.config
@@ -129,7 +129,7 @@
129129
130 def setUp(self):130 def setUp(self):
131 ircbot_factory = ircbot.IRCBotFactory(server)131 ircbot_factory = ircbot.IRCBotFactory(server)
132 ircbot.logger.setLevel("error")132 ircbot.logger.setLevel(logging.ERROR)
133 self.bot = bot = ircbot.IrcBot()133 self.bot = bot = ircbot.IrcBot()
134 bot.factory = ircbot_factory134 bot.factory = ircbot_factory
135 bot.config = ircbot_factory.config135 bot.config = ircbot_factory.config
@@ -168,7 +168,7 @@
168168
169 def setUp(self):169 def setUp(self):
170 ircbot_factory = ircbot.IRCBotFactory(server.copy())170 ircbot_factory = ircbot.IRCBotFactory(server.copy())
171 ircbot.logger.setLevel("error")171 ircbot.logger.setLevel(logging.ERROR)
172 self.bot = bot = ircbot.IrcBot()172 self.bot = bot = ircbot.IrcBot()
173 bot.factory = ircbot_factory173 bot.factory = ircbot_factory
174 bot.config = ircbot_factory.config174 bot.config = ircbot_factory.config
175175
=== modified file 'lalita/core/tests/test_metacommands.py'
--- lalita/core/tests/test_metacommands.py 2010-01-17 19:03:28 +0000
+++ lalita/core/tests/test_metacommands.py 2012-07-09 13:48:35 +0000
@@ -4,6 +4,7 @@
4# License: GPL v34# License: GPL v3
5# For further info, see LICENSE file5# For further info, see LICENSE file
66
7import logging
7import unittest8import unittest
89
9from collections import defaultdict10from collections import defaultdict
@@ -11,7 +12,7 @@
11from lalita import dispatcher, events, ircbot12from lalita import dispatcher, events, ircbot
1213
13ircbot_factory = ircbot.IRCBotFactory(dict(log_config="error", channels=defaultdict(lambda: {})))14ircbot_factory = ircbot.IRCBotFactory(dict(log_config="error", channels=defaultdict(lambda: {})))
14ircbot.logger.setLevel("error")15ircbot.logger.setLevel(logging.ERROR)
15bot = ircbot.IrcBot()16bot = ircbot.IrcBot()
16bot.factory = ircbot_factory17bot.factory = ircbot_factory
17bot.config = ircbot_factory.config18bot.config = ircbot_factory.config
1819
=== modified file 'lalita/plugins/tests/helper.py'
--- lalita/plugins/tests/helper.py 2010-03-08 13:52:37 +0000
+++ lalita/plugins/tests/helper.py 2012-07-09 13:48:35 +0000
@@ -2,6 +2,7 @@
2# License: GPL v32# License: GPL v3
3# For further info, see LICENSE file3# For further info, see LICENSE file
44
5import logging
5import unittest6import unittest
67
7from lalita import ircbot8from lalita import ircbot
@@ -27,8 +28,8 @@
27 (plugin_name, config) = server_plugin28 (plugin_name, config) = server_plugin
28 self.test_server["plugins"] = { plugin_name: config }29 self.test_server["plugins"] = { plugin_name: config }
2930
30 self.test_server["log_config"] = { plugin_name: "error" }31 self.test_server["log_config"] = { plugin_name: "ERROR" }
31 ircbot.logger.setLevel("error")32 ircbot.logger.setLevel(logging.ERROR)
32 ircbot_factory = ircbot.IRCBotFactory(self.test_server)33 ircbot_factory = ircbot.IRCBotFactory(self.test_server)
33 self.bot = ircbot.IrcBot()34 self.bot = ircbot.IrcBot()
34 self.bot.factory = ircbot_factory35 self.bot.factory = ircbot_factory
@@ -61,6 +62,11 @@
61 else:62 else:
62 raise ValueError("The searched plugin does not exist!")63 raise ValueError("The searched plugin does not exist!")
6364
65 def tearDown(self):
66 if hasattr(self, 'bot') and hasattr(self.bot, 'dispatcher'):
67 self.bot.dispatcher.shutdown()
68
69
64 def assertMessageInAnswer(self, message_idx, expected):70 def assertMessageInAnswer(self, message_idx, expected):
65 """assert the content of message with index: message_idx in self.answer71 """assert the content of message with index: message_idx in self.answer
66 and handle unexpected errors properly."""72 and handle unexpected errors properly."""
6773
=== added file 'lalita/plugins/tests/test_zmq_proxy.py'
--- lalita/plugins/tests/test_zmq_proxy.py 1970-01-01 00:00:00 +0000
+++ lalita/plugins/tests/test_zmq_proxy.py 2012-07-09 13:48:35 +0000
@@ -0,0 +1,204 @@
1# -*- coding: utf8 -*-
2
3# Copyright 2010 laliputienses
4# License: GPL v3
5# For further info, see LICENSE file
6import json
7import time
8
9from twisted.trial.unittest import TestCase as TwistedTestCase
10from twisted.internet import defer, reactor, error
11
12from lalita import events
13from lalita.core.dispatcher import USER_POS, CHANNEL_POS
14
15from .helper import PluginTest
16from lalita.plugins.zmq_proxy import EVENT_MAP, PluginProcess
17
18try:
19 import zmq, txzmq
20 zmq_available = True
21except ImportError:
22 zmq_available = False
23
24
25class TestZMQPlugin(TwistedTestCase, PluginTest):
26
27 if not zmq_available:
28 skip = "pyzmq and txzmq required."
29
30 def setUp(self):
31 super(TestZMQPlugin, self).setUp()
32 self.init(server_plugin=("lalita.plugins.zmq_proxy.ZMQPlugin",
33 {"events_address":"inproc://pub_addr",
34 "bot_address":"inproc://sub_addr"}))
35 self.ctx = zmq.Context.instance()
36 self.sub = self.ctx.socket(zmq.SUB)
37 while True:
38 try:
39 self.sub.connect("inproc://pub_addr")
40 except zmq.ZMQError:
41 continue
42 else:
43 break
44 self.sub.setsockopt(zmq.SUBSCRIBE, "")
45 self.cmd = self.ctx.socket(zmq.PUB)
46 while True:
47 try:
48 self.cmd.connect("inproc://sub_addr")
49 except zmq.ZMQError:
50 time.sleep(0.2)
51 continue
52 else:
53 break
54
55 def tearDown(self):
56 self.sub.close()
57 self.cmd.close()
58 self.plugin.shutdown()
59 return super(TestZMQPlugin, self).tearDown()
60
61 @defer.inlineCallbacks
62 def test_say_action(self):
63 """Say something via zmq."""
64 called = []
65 self.patch(self.plugin, 'say', lambda *a: called.append(a))
66 msg = {'action':'say', 'to_whom':"channel", "msg":"hola", "args":[]}
67 self.cmd.send(json.dumps(msg))
68 d = defer.Deferred()
69 reactor.callLater(0.2, lambda: d.callback(None))
70 yield d
71 self.assertEqual(called[0], (msg['to_whom'].encode('utf-8'), msg['msg'].decode("utf-8")))
72
73 def test_event_handler(self):
74 """Handle all events."""
75 called = []
76 self.patch(self.plugin, 'publish', lambda *a: called.append(a))
77 tested_events = {
78 events.CONNECTION_MADE:(),
79 events.CONNECTION_LOST:(),
80 events.SIGNED_ON:(),
81 events.JOINED:("channel",),
82 events.PRIVATE_MESSAGE:("channel",),
83 events.TALKED_TO_ME:("channel", "user"),
84 events.PUBLIC_MESSAGE:("channel", "user"),
85 events.ACTION:("channel", "user"),
86 events.JOIN:("channel", "user"),
87 events.LEFT:("channel", "user"),
88 events.QUIT:("user",),
89 events.KICK:("channel", "user"),
90 events.RENAME:("user",)
91 }
92 for event, fixed_args in tested_events.items():
93 msg = "a message"
94 args = fixed_args + (msg,)
95 self.disp.push(event, *args)
96 kwargs = {}
97 if CHANNEL_POS[event] is not None:
98 kwargs['channel'] = args[CHANNEL_POS[event]]
99 if USER_POS[event] is not None:
100 kwargs['user'] = args[USER_POS[event]]
101 kwargs['args'] = [msg]
102 self.assertIn((EVENT_MAP[event][0], kwargs), called)
103
104
105class TestPluginProcess(TwistedTestCase, PluginTest):
106 """Tests for PluginProcess."""
107
108 class TestPlugin(PluginProcess):
109 def __init__(self, addr1, addr2, called, config=None):
110 self.called = called
111 super(TestPluginProcess.TestPlugin, self).__init__(addr1, addr2,
112 config=config)
113
114 def init(self, config):
115 self.config = config
116 self.called.append(("init", config))
117
118 def _connect(self, events_address, bot_address):
119 self.ctx = zmq.Context.instance()
120 self.sub_socket = self.ctx.socket(zmq.SUB)
121 while True:
122 try:
123 self.sub_socket.connect(events_address)
124 except zmq.ZMQError:
125 time.sleep(0.1)
126 continue
127 else:
128 break
129 self.sub_socket.setsockopt(zmq.SUBSCRIBE, "")
130 self.bot_socket = self.ctx.socket(zmq.PUB)
131 while True:
132 try:
133 self.bot_socket.connect(bot_address)
134 except zmq.ZMQError:
135 time.sleep(0.1)
136 continue
137 else:
138 break
139
140 def setUp(self):
141 super(TestPluginProcess, self).setUp()
142 events_address = "inproc://pub_addr"
143 bot_address = "inproc://sub_addr"
144 self.init(server_plugin=("lalita.plugins.zmq_proxy.ZMQPlugin",
145 {"events_address":events_address,
146 "bot_address":bot_address}))
147 self.called = []
148 try:
149 self.zmq_plugin = TestPluginProcess.TestPlugin(events_address,
150 bot_address,
151 self.called)
152 except Exception, e:
153 import traceback;
154 traceback.print_exc()
155
156 def tearDown(self):
157 self.zmq_plugin.bot_socket.close()
158 self.zmq_plugin.sub_socket.close()
159 self.plugin.shutdown()
160 return super(TestPluginProcess, self).tearDown()
161
162 def test_init(self):
163 """Test init is called."""
164 self.assertEquals(self.called, [("init", None)])
165
166 def test_register(self):
167 """Register to en event."""
168 matcher = lambda a: True
169 func = lambda *a: True
170 self.zmq_plugin.register(events.JOINED, func, matcher)
171 self.assertIn(events.JOINED, self.zmq_plugin._events)
172 self.assertEquals(self.zmq_plugin._events[events.JOINED], (func, matcher))
173
174 def test_register_command(self):
175 func = lambda a: None
176 self.patch(self.zmq_plugin, '_send', self.called.append)
177 self.zmq_plugin.register_command(func, "command")
178 self.assertIn("irc.command", self.zmq_plugin._events)
179 self.assertEquals(self.zmq_plugin._events["irc.command"][0][0], func)
180 self.assertIn({'action':'register_command',
181 'command':["command"]}, self.called)
182
183 def test_register_commands(self):
184 func = lambda a: None
185 self.patch(self.zmq_plugin, '_send', self.called.append)
186 self.zmq_plugin.register_command(func, "command")
187 self.assertIn("irc.command", self.zmq_plugin._events)
188 self.assertEquals(self.zmq_plugin._events["irc.command"][0][0], func)
189 self.assertIn({'action':'register_command',
190 'command':["command"]}, self.called)
191 func1 = lambda a: None
192 self.zmq_plugin.register_command(func1, "command1")
193 self.assertEqual(len(self.zmq_plugin._events["irc.command"]), 2)
194 self.assertEquals(self.zmq_plugin._events["irc.command"][1][0], func1)
195 self.assertIn({'action':'register_command',
196 'command':["command1"]}, self.called)
197
198
199
200 def test_say(self):
201 self.patch(self.zmq_plugin, '_send', self.called.append)
202 self.zmq_plugin.say("me", "message")
203 self.assertIn({'action':'say', 'to_whom':'me', 'msg':"message",
204 'args':()}, self.called)
0205
=== added directory 'lalita/plugins/zmq_plugins'
=== added file 'lalita/plugins/zmq_plugins/example.py'
--- lalita/plugins/zmq_plugins/example.py 1970-01-01 00:00:00 +0000
+++ lalita/plugins/zmq_plugins/example.py 2012-07-09 13:48:35 +0000
@@ -0,0 +1,52 @@
1# -*- coding: utf8 -*-
2# Copyright 2009-2012 laliputienses
3# License: GPL v3
4# For further info, see LICENSE file
5
6from lalita.plugins.zmq_proxy import PluginProcess
7
8
9class Example(PluginProcess):
10 """Example zmq-based plugin."""
11
12 def init(self, config):
13 self.logger.info("Configuring Example Plugin!")
14 # register the commands
15 self.register_command(self.cmd_example, "example")
16 self.register_command(self.cmd_example1, "example1")
17 self.register("irc.private_message", self.example_priv)
18 self.register("irc.talked_to_me", self.cmd_example)
19
20 def example_priv(self, user, command, *args):
21 """Just say something."""
22 self.say(user, "This is an example plugin.")
23
24 def cmd_example(self, user, channel, command, *args):
25 """Just say something."""
26 self.logger.debug("command %s from %s (args: %s)", command, user, args)
27 self.say(channel, "This is an example plugin.")
28
29 def cmd_example1(self, user, channel, command, *args):
30 """Just say something."""
31 self.logger.debug("command %s from %s (args: %s)", command, user, args)
32 self.say(channel, "Another example.")
33
34
35if __name__ == "__main__":
36 import optparse
37 parser = optparse.OptionParser()
38 parser.add_option("-s", "--events-address", dest="events_address",
39 default="tcp://127.0.0.1:9090")
40 parser.add_option("-b", "--bot-address", dest="bot_address",
41 default="tcp://127.0.0.1:9091")
42 options, args = parser.parse_args()
43 import logging
44 logging.basicConfig()
45 logging.getLogger().setLevel(logging.DEBUG)
46
47 try:
48 Example(options.events_address, options.bot_address).run()
49 except:
50 import traceback;
51 traceback.print_exc()
52
053
=== added file 'lalita/plugins/zmq_proxy.py'
--- lalita/plugins/zmq_proxy.py 1970-01-01 00:00:00 +0000
+++ lalita/plugins/zmq_proxy.py 2012-07-09 13:48:35 +0000
@@ -0,0 +1,210 @@
1#
2# Copyright 2009-2012 laliputienses
3# License: GPL v3
4# For further info, see LICENSE file
5#
6"""ZeroMQ proxy plugin."""
7
8import json
9import logging
10import re
11
12import zmq
13
14from txzmq.pubsub import ZmqSubConnection
15from txzmq import ZmqFactory, ZmqEndpoint
16
17from lalita import Plugin
18from lalita.core import events
19
20
21EVENT_MAP = {
22 events.CONNECTION_MADE:("irc.connection_made", None, None),
23 events.CONNECTION_LOST:("irc.connection_lost", None, None),
24 events.SIGNED_ON:("irc.signed_on", None, None),
25 events.JOINED:("irc.joined", 0, None),
26 events.PRIVATE_MESSAGE:("irc.private_message", None, 0),
27 events.TALKED_TO_ME:("irc.talked_to_me", 1, 0),
28 events.COMMAND:("irc.command", 1, 0),
29 events.PUBLIC_MESSAGE:("irc.public_message", 1, 0),
30 events.ACTION:("irc.action", 1, 0),
31 events.JOIN:("irc.join", 1, 0),
32 events.LEFT:("irc.left", 1, 0),
33 events.QUIT:("irc.quit", None, 0),
34 events.KICK:("irc.kick", 1, 0),
35 events.RENAME:("irc.rename", None, 0),
36}
37
38
39class BotConnection(ZmqSubConnection):
40 """Bot zmq connection."""
41
42 def __init__(self, plugin, factory, endpoint):
43 ZmqSubConnection.__init__(self, factory, endpoint)
44 self.plugin = plugin
45
46 def messageReceived(self, *a, **kw):
47 return ZmqSubConnection.messageReceived(self, *a, **kw)
48
49 def gotMessage(self, message):
50 info = json.loads(message)
51 if info['action'] == "say":
52 if isinstance(info['to_whom'], unicode):
53 self.plugin.say(info['to_whom'].encode('utf-8'),
54 info['msg'], *info['args'])
55 else:
56 self.plugin.say(info['to_whom'], info['msg'], *info['args'])
57 elif info["action"] == "register_command":
58 self.plugin.register_command(info['command'])
59 else:
60 self.plugin.log.error("Invalid Action %s", message)
61
62
63class EventHandler(object):
64 """Bridge of lalita event's to zmq messages."""
65
66 def __init__(self, plugin, event_name, channel_pos, user_pos):
67 self.name = event_name
68 self.channel_pos = channel_pos
69 self.user_pos = user_pos
70 self.plugin = plugin
71 self.im_self = plugin
72 self.im_func = self.__call__
73 self.logger = plugin.logger
74
75 def __call__(self, *args):
76 self.logger.debug("event: %s", args)
77 msg_args = {}
78 if self.channel_pos is not None:
79 msg_args['channel'] = args[self.channel_pos]
80 if self.user_pos is not None:
81 msg_args['user'] = args[self.user_pos]
82 msg_args['args'] = [a for i, a in enumerate(args) \
83 if i != self.channel_pos and i != self.user_pos]
84 self.logger.debug("publishing: %s - %s", self.name, msg_args)
85 self.plugin.publish(self.name, msg_args)
86
87
88class ZMQPlugin(Plugin):
89 """ZeroMQ plugin."""
90
91 def init(self, config):
92 self._plugins = {}
93 self.config = config
94 pub_address = self.config['events_address']
95 cmd_address = self.config['bot_address']
96 self.commands = set() # hold all the commands
97 self.ctx = zmq.Context.instance()
98 self.pub_socket = self.ctx.socket(zmq.PUB)
99 self.pub_socket.bind(pub_address)
100 # callback/command socket
101 zmq_factory = ZmqFactory(context=self.ctx)
102 rpc_endpoint = ZmqEndpoint("bind", cmd_address)
103 self.cmd_socket = BotConnection(self, zmq_factory, rpc_endpoint)
104 self.cmd_socket.subscribe("")
105 for event, info in EVENT_MAP.items():
106 if event != events.COMMAND:
107 self.register(event, EventHandler(self, *info), re.compile(".*"))
108
109 def shutdown(self):
110 if hasattr(self, 'cmd_socket'):
111 self.cmd_socket.shutdown()
112 if hasattr(self, 'pub_socket'):
113 self.pub_socket.close()
114 self.ctx.term()
115 for proc in self._plugins.values():
116 proc.loseConnection()
117
118 def register_command(self, commands):
119 """Register a list of commands."""
120 if not self.commands.intersection(set(commands)):
121 for command in commands:
122 self.commands.add(command)
123 self.register(events.COMMAND,
124 EventHandler(self, *EVENT_MAP[events.COMMAND]),
125 commands)
126 # else, already registered.
127
128 def publish(self, name, msg_args):
129 """Publish a message/event."""
130 self.pub_socket.send(name, zmq.SNDMORE)
131 self.pub_socket.send(json.dumps(msg_args))
132
133
134class PluginProcess(object):
135 """Base class for ZeroMQ plugins."""
136
137 def __init__(self, events_address, bot_address, config=None):
138 self.ctx = None
139 self.sub_socket = None
140 self.cmd_socket = None
141 self.config = config or {}
142 self.logger = logging.getLogger("zmq_plugin.%s" %
143 (self.__class__.__name__,))
144 self._connect(events_address, bot_address)
145 # setup the event handler
146 self._events = {}
147 self.init(config)
148
149 def _connect(self, events_address, bot_address):
150 self.ctx = zmq.Context.instance()
151 self.sub_socket = self.ctx.socket(zmq.SUB)
152 self.sub_socket.connect(events_address)
153 self.sub_socket.setsockopt(zmq.SUBSCRIBE, "irc")
154 # create the bot socket
155 self.bot_socket = self.ctx.socket(zmq.PUB)
156 self.bot_socket.connect(bot_address)
157
158 def init(self, config):
159 """Subclass responsability."""
160 pass
161
162 def _send(self, info):
163 self.bot_socket.send(json.dumps(info))
164
165 def register_command(self, function, command_name):
166 """Register a command."""
167 self._events.setdefault('irc.command', [])\
168 .append((function, lambda a: command_name in a))
169 new_msg = {'action':'register_command', 'command':[command_name]}
170 self._send(new_msg)
171
172 def register(self, event, function, matcher=None):
173 """Register a event handler."""
174 self._events[event] = (function, matcher)
175
176 def say(self, to_whom, msg, *args):
177 """Say something."""
178 new_msg = {'action':'say', 'to_whom':to_whom, "msg":msg, "args":args}
179 self._send(new_msg)
180
181 def run(self):
182 """Main loop"""
183 while True:
184 # block waiting for a message
185 match = False
186 event = self.sub_socket.recv()
187 payload = json.loads(self.sub_socket.recv())
188 if event == "irc.command":
189 commands = self._events[event]
190 for handler, matcher in commands:
191 if matcher(payload['args']):
192 match = True
193 break
194 else:
195 match = True
196 try:
197 handler, matcher = self._events[event]
198
199 except KeyError:
200 self.logger.error("No handler for %s", event)
201 continue
202 if match:
203 user = payload.get('user')
204 channel = payload.get('channel')
205 args = [a for a in [user, channel] + payload['args'] \
206 if a is not None]
207 handler(*args)
208 else:
209 self.logger.debug("No match for %s", payload)
210

Subscribers

People subscribed via source and target branches