Merge ~cjwatson/launchpad:kafka-producer into launchpad:master

Proposed by Colin Watson
Status: Needs review
Proposed branch: ~cjwatson/launchpad:kafka-producer
Merge into: launchpad:master
Diff against target: 324 lines (+246/-0)
10 files modified
lib/lp/services/config/schema-lazr.conf (+22/-0)
lib/lp/services/configure.zcml (+1/-0)
lib/lp/services/kafka/__init__.py (+0/-0)
lib/lp/services/kafka/client.py (+59/-0)
lib/lp/services/kafka/configure.zcml (+16/-0)
lib/lp/services/kafka/interfaces.py (+41/-0)
lib/lp/services/kafka/tests/__init__.py (+0/-0)
lib/lp/services/kafka/tests/test_client.py (+105/-0)
requirements/launchpad.txt (+1/-0)
setup.cfg (+1/-0)
Reviewer Review Type Date Requested Status
Guruprasad Approve
Review via email: mp+439414@code.launchpad.net

Commit message

Add a simple Kafka producer abstraction

Description of the change

This isn't used anywhere as yet, but in an earlier experiment I was able to use this to send events to Kafka on `git push`.

I want the abstraction layer in place for ease of testing, and in order that we can switch to different bindings without too much trouble, since we aren't very familiar with the operational properties of these bindings as yet.

Dependencies MP: https://code.launchpad.net/~cjwatson/lp-source-dependencies/+git/lp-source-dependencies/+merge/439413

To post a comment you must log in.
Revision history for this message
Guruprasad (lgp171188) wrote :

LGTM 👍

review: Approve

Unmerged commits

da53c25... by Colin Watson

Add a simple Kafka producer abstraction

This isn't used anywhere as yet, but in an earlier experiment I was able
to use this to send events to Kafka on `git push`.

I want the abstraction layer in place for ease of testing, and in order
that we can switch to different bindings without too much trouble, since
we aren't very familiar with the operational properties of these
bindings as yet.

Succeeded
[SUCCEEDED] docs:0 (build)
[SUCCEEDED] lint:0 (build)
[SUCCEEDED] mypy:0 (build)
13 of 3 results

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/lib/lp/services/config/schema-lazr.conf b/lib/lp/services/config/schema-lazr.conf
2index db9893c..d8ac9b8 100644
3--- a/lib/lp/services/config/schema-lazr.conf
4+++ b/lib/lp/services/config/schema-lazr.conf
5@@ -895,6 +895,28 @@ public_https: True
6 port: 11371
7
8
9+[kafka]
10+# Space-separated list of host:port strings to contact to bootstrap initial
11+# Kafka cluster metadata.
12+bootstrap_servers: none
13+
14+# A name for Launchpad's Kafka client, sent in requests to servers.
15+client_id: launchpad
16+
17+# Username to use when connecting to the Kafka cluster.
18+username: none
19+
20+# Password to use when connecting to the Kafka cluster.
21+password: none
22+
23+# Path to CA certificate file to use when connecting to the Kafka cluster.
24+ca_certificate_path: none
25+
26+# Service name to use as part of Kafka topic names. May only contain
27+# [a-z0-9.-].
28+service_name: launchpad
29+
30+
31 [karmacacheupdater]
32 # The database user which will be used by this process.
33 # datatype: string
34diff --git a/lib/lp/services/configure.zcml b/lib/lp/services/configure.zcml
35index 77dac94..356ea99 100644
36--- a/lib/lp/services/configure.zcml
37+++ b/lib/lp/services/configure.zcml
38@@ -16,6 +16,7 @@
39 <include package=".identity" />
40 <include package=".inlinehelp" file="meta.zcml" />
41 <include package=".job" />
42+ <include package=".kafka" />
43 <include package=".librarian" />
44 <include package=".macaroons" />
45 <include package=".mail" />
46diff --git a/lib/lp/services/kafka/__init__.py b/lib/lp/services/kafka/__init__.py
47new file mode 100644
48index 0000000..e69de29
49--- /dev/null
50+++ b/lib/lp/services/kafka/__init__.py
51diff --git a/lib/lp/services/kafka/client.py b/lib/lp/services/kafka/client.py
52new file mode 100644
53index 0000000..8cc9523
54--- /dev/null
55+++ b/lib/lp/services/kafka/client.py
56@@ -0,0 +1,59 @@
57+# Copyright 2023 Canonical Ltd. This software is licensed under the
58+# GNU Affero General Public License version 3 (see the file LICENSE).
59+
60+"""Kafka clients."""
61+
62+__all__ = [
63+ "KafkaProducer",
64+]
65+
66+import json
67+from typing import Optional
68+
69+from kafka import KafkaProducer as BaseKafkaProducer
70+from zope.interface import implementer
71+
72+from lp.services.config import config
73+from lp.services.kafka.interfaces import IKafkaProducer, KafkaUnconfigured
74+from lp.services.propertycache import cachedproperty, get_property_cache
75+
76+
77+@implementer(IKafkaProducer)
78+class KafkaProducer:
79+ """See `IKafkaProducer`."""
80+
81+ @cachedproperty
82+ def _producer(self) -> BaseKafkaProducer:
83+ if config.kafka.bootstrap_servers is None:
84+ raise KafkaUnconfigured()
85+ return BaseKafkaProducer(
86+ bootstrap_servers=config.kafka.bootstrap_servers.split(),
87+ client_id=config.kafka.client_id,
88+ key_serializer=lambda m: json.dumps(m).encode(),
89+ value_serializer=lambda m: json.dumps(m).encode(),
90+ security_protocol="SASL_SSL",
91+ ssl_check_hostname=False,
92+ ssl_cafile=config.kafka.ca_certificate_path,
93+ sasl_plain_username=config.kafka.username,
94+ sasl_plain_password=config.kafka.password,
95+ sasl_mechanism="SCRAM-SHA-512",
96+ )
97+
98+ def send(
99+ self,
100+ topic: str,
101+ value: Optional[object] = None,
102+ key: Optional[object] = None,
103+ ) -> None:
104+ """See `IKafkaProducer`."""
105+ self._producer.send(topic, value=value, key=key)
106+
107+ def flush(self) -> None:
108+ """See `IKafkaProducer`."""
109+ self._producer.flush()
110+
111+ def close(self) -> None:
112+ """See `IKafkaProducer`."""
113+ self._producer.flush()
114+ self._producer.close()
115+ del get_property_cache(self)._producer
116diff --git a/lib/lp/services/kafka/configure.zcml b/lib/lp/services/kafka/configure.zcml
117new file mode 100644
118index 0000000..ebdf741
119--- /dev/null
120+++ b/lib/lp/services/kafka/configure.zcml
121@@ -0,0 +1,16 @@
122+<!-- Copyright 2023 Canonical Ltd. This software is licensed under the
123+ GNU Affero General Public License version 3 (see the file LICENSE).
124+-->
125+
126+<configure
127+ xmlns="http://namespaces.zope.org/zope"
128+ xmlns:i18n="http://namespaces.zope.org/i18n"
129+ xmlns:lp="http://namespaces.canonical.com/lp"
130+ i18n_domain="launchpad">
131+
132+ <lp:securedutility
133+ class="lp.services.kafka.client.KafkaProducer"
134+ provides="lp.services.kafka.interfaces.IKafkaProducer">
135+ <allow interface="lp.services.kafka.interfaces.IKafkaProducer" />
136+ </lp:securedutility>
137+</configure>
138diff --git a/lib/lp/services/kafka/interfaces.py b/lib/lp/services/kafka/interfaces.py
139new file mode 100644
140index 0000000..bf8cfa5
141--- /dev/null
142+++ b/lib/lp/services/kafka/interfaces.py
143@@ -0,0 +1,41 @@
144+# Copyright 2023 Canonical Ltd. This software is licensed under the
145+# GNU Affero General Public License version 3 (see the file LICENSE).
146+
147+"""Kafka client interfaces."""
148+
149+__all__ = [
150+ "IKafkaProducer",
151+ "KafkaUnconfigured",
152+]
153+
154+from typing import Optional
155+
156+from zope.interface import Interface
157+
158+
159+class KafkaUnconfigured(Exception):
160+ pass
161+
162+
163+class IKafkaProducer(Interface):
164+ """A client that publishes records to the Kafka cluster."""
165+
166+ def send(
167+ topic: str,
168+ value: Optional[object] = None,
169+ key: Optional[object] = None,
170+ ) -> None:
171+ """Publish a message to a topic.
172+
173+ The value and key are serialized using JSON.
174+ """
175+
176+ def flush() -> None:
177+ """Flush all previously-sent messages to the server.
178+
179+ Blocks until all requests have completed, whether successfully or
180+ with errors.
181+ """
182+
183+ def close() -> None:
184+ """Close this producer."""
185diff --git a/lib/lp/services/kafka/tests/__init__.py b/lib/lp/services/kafka/tests/__init__.py
186new file mode 100644
187index 0000000..e69de29
188--- /dev/null
189+++ b/lib/lp/services/kafka/tests/__init__.py
190diff --git a/lib/lp/services/kafka/tests/test_client.py b/lib/lp/services/kafka/tests/test_client.py
191new file mode 100644
192index 0000000..34faf8f
193--- /dev/null
194+++ b/lib/lp/services/kafka/tests/test_client.py
195@@ -0,0 +1,105 @@
196+# Copyright 2023 Canonical Ltd. This software is licensed under the
197+# GNU Affero General Public License version 3 (see the file LICENSE).
198+
199+"""Tests for Kafka clients."""
200+
201+from unittest import mock
202+
203+from fixtures import MockPatch
204+from zope.component import getUtility
205+
206+from lp.services.kafka.interfaces import IKafkaProducer, KafkaUnconfigured
207+from lp.services.propertycache import clear_property_cache
208+from lp.testing import TestCase
209+from lp.testing.layers import ZopelessLayer
210+
211+
212+class TestKafkaProducer(TestCase):
213+
214+ layer = ZopelessLayer
215+
216+ def setUp(self):
217+ super().setUp()
218+ self.pushConfig(
219+ "kafka",
220+ bootstrap_servers="kafka.example.com:9092",
221+ username="launchpad-service",
222+ password="secret",
223+ )
224+ self.mock_producer_class = self.useFixture(
225+ MockPatch("lp.services.kafka.client.BaseKafkaProducer")
226+ ).mock
227+ self.bootstrap_call = mock.call(
228+ bootstrap_servers=["kafka.example.com:9092"],
229+ client_id="launchpad",
230+ key_serializer=mock.ANY,
231+ value_serializer=mock.ANY,
232+ security_protocol="SASL_SSL",
233+ ssl_check_hostname=False,
234+ ssl_cafile=None,
235+ sasl_plain_username="launchpad-service",
236+ sasl_plain_password="secret",
237+ sasl_mechanism="SCRAM-SHA-512",
238+ )
239+ self.addCleanup(clear_property_cache, getUtility(IKafkaProducer))
240+
241+ def test_unconfigured(self):
242+ self.pushConfig("kafka", bootstrap_servers=None)
243+ self.assertRaises(
244+ KafkaUnconfigured, getUtility(IKafkaProducer).send, "test-topic"
245+ )
246+
247+ def test_send(self):
248+ value = {"event": "create"}
249+ key = {"id": 1}
250+ getUtility(IKafkaProducer).send("test-topic", value=value, key=key)
251+ self.mock_producer_class.assert_has_calls(
252+ [
253+ self.bootstrap_call,
254+ self.bootstrap_call.send(
255+ "test-topic", value={"event": "create"}, key={"id": 1}
256+ ),
257+ ],
258+ any_order=False,
259+ )
260+
261+ def test_send_and_flush(self):
262+ value = {"event": "create"}
263+ key = {"id": 1}
264+ getUtility(IKafkaProducer).send("test-topic", value=value, key=key)
265+ getUtility(IKafkaProducer).flush()
266+ self.mock_producer_class.assert_has_calls(
267+ [
268+ self.bootstrap_call,
269+ self.bootstrap_call.send(
270+ "test-topic", value={"event": "create"}, key={"id": 1}
271+ ),
272+ self.bootstrap_call.flush(),
273+ ],
274+ any_order=False,
275+ )
276+
277+ def test_close(self):
278+ getUtility(IKafkaProducer).close()
279+ self.mock_producer_class.assert_has_calls(
280+ [
281+ self.bootstrap_call,
282+ self.bootstrap_call.flush(),
283+ self.bootstrap_call.close(),
284+ ],
285+ any_order=False,
286+ )
287+
288+ def test_close_and_reopen(self):
289+ getUtility(IKafkaProducer).close()
290+ getUtility(IKafkaProducer).flush()
291+ self.mock_producer_class.assert_has_calls(
292+ [
293+ self.bootstrap_call,
294+ self.bootstrap_call.flush(),
295+ self.bootstrap_call.close(),
296+ self.bootstrap_call,
297+ self.bootstrap_call.flush(),
298+ ],
299+ any_order=False,
300+ )
301diff --git a/requirements/launchpad.txt b/requirements/launchpad.txt
302index 0b7e852..caca0d9 100644
303--- a/requirements/launchpad.txt
304+++ b/requirements/launchpad.txt
305@@ -72,6 +72,7 @@ iso8601==0.1.12
306 jedi==0.17.2
307 jmespath==0.10.0
308 jsautobuild==0.2
309+kafka-python==2.0.2
310 keyring==0.6.2
311 keystoneauth1==4.1.0
312 kombu==4.6.11
313diff --git a/setup.cfg b/setup.cfg
314index be574c6..6e0ea0e 100644
315--- a/setup.cfg
316+++ b/setup.cfg
317@@ -44,6 +44,7 @@ install_requires =
318 importlib-resources; python_version < "3.7"
319 ipython
320 jsautobuild
321+ kafka-python
322 kombu
323 launchpad-buildd
324 launchpadlib

Subscribers

People subscribed via source and target branches

to status/vote changes: