Merge ~troyanov/maas:temporal-go-worker into maas:master

Proposed by Anton Troyanov
Status: Merged
Approved by: Anton Troyanov
Approved revision: c6b268dcfacd971773921250be8b0f0da14c7f44
Merge reported by: MAAS Lander
Merged at revision: not available
Proposed branch: ~troyanov/maas:temporal-go-worker
Merge into: maas:master
Diff against target: 507 lines (+343/-4)
15 files modified
debian/extras/99-maas-agent-sudoers (+1/-0)
debian/maas-agent.install (+3/-0)
debian/maas-agent.maas-agent.service (+14/-0)
debian/not-installed (+0/-3)
debian/rules (+1/-0)
snap/local/tree/bin/run-maas-agent (+6/-0)
snap/local/tree/usr/share/maas/pebble/layers/003-maas-rack-layer.yaml (+5/-0)
snap/snapcraft.yaml (+10/-0)
src/maasagent/cmd/maas-agent/main.go (+65/-1)
src/maasagent/internal/workflow/log/adapter.go (+45/-0)
src/maasagent/internal/workflow/pool.go (+153/-0)
src/maasserver/models/service.py (+2/-0)
src/provisioningserver/rackdservices/external.py (+19/-0)
src/provisioningserver/service_monitor.py (+9/-0)
src/provisioningserver/tests/test_service_monitor.py (+10/-0)
Reviewer Review Type Date Requested Status
MAAS Lander Approve
Christian Grabowski Approve
Review via email: mp+447784@code.launchpad.net

Commit message

feat(temporal): maas-agent with temporal worker

Add worker pool with control plane for dynamic registration of Temporal workers.
Add adapter for zerolog that allows usage of custom logger for Temporal client.
Add snap and deb packaging for maas-agent

Co-authored-by: Christian Grabowski <email address hidden>

Description of the change

Here are some examples:
1. I want to execute Workflow `CheckIP` for Task Queue `vlan-1`

`tctl --ad 10.0.0.62:5271 workflow run --taskqueue vlan-1 --workflow_type CheckIP --input '{"IPs":["172.16.1.1", "172.16.1.11"]}'`

Since I don't have any workers registered for that type of a Workflow and that Task Queue, nothing will happen.

2. I am triggering a *control plane* Workflow `AddWorker` on a Task Queue `amazed-asp` and pass parameter saying: I want to register new worker on a Task Queue `vlan-1` and ask that worker to register Workflow `CheckIP`

`tctl --ad 10.0.0.62:5271 workflow run --taskqueue amazed-asp --workflow_type AddWorker --input '{"TaskQueue":"vlan-1", "Workflows":["CheckIP"]}'`

3. Running command from 1 again will give me result:
```
Result:
  Run Time: 1 seconds
  Status: COMPLETED
  Output: [{"IPs":{"172.16.1.1":"ABY+SvJA","172.16.1.11":"ABY+SvJA"}}]
```

4. If I am not interested in having such Worker on certain host anymore, I simply execute `RemoveWorker` workflow with a Task Queue `amazed-asp` and that will remove the Worker completely. Note, you cannot unregister specific workflow from the worker.

`tctl --ad 10.0.0.62:5271 workflow run --taskqueue amazed-asp --workflow_type RemoveWorker --input '{"TaskQueue":"vlan-1"}'`

To post a comment you must log in.
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b temporal-go-worker lp:~troyanov/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: SUCCESS
COMMIT: 415a890a13ff72513c8a2c1a128fa1edf79064d7

review: Approve
Revision history for this message
Christian Grabowski (cgrabowski) wrote :

+1

review: Approve
Revision history for this message
MAAS Lander (maas-lander) wrote :

LANDING
-b temporal-go-worker lp:~troyanov/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: FAILED BUILD
LOG: http://maas-ci.internal:8080/job/maas-tester/3170/console

Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b temporal-go-worker lp:~troyanov/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: FAILED
LOG: http://maas-ci.internal:8080/job/maas-tester/3171/console
COMMIT: 7358dffe3344b08759e3299254e5858cfdc644a8

review: Needs Fixing
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b temporal-go-worker lp:~troyanov/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: SUCCESS
COMMIT: c6b268dcfacd971773921250be8b0f0da14c7f44

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/debian/extras/99-maas-agent-sudoers b/debian/extras/99-maas-agent-sudoers
2new file mode 100644
3index 0000000..a8e915a
4--- /dev/null
5+++ b/debian/extras/99-maas-agent-sudoers
6@@ -0,0 +1 @@
7+maas ALL= NOPASSWD: /usr/sbin/maas-agent
8diff --git a/debian/maas-agent.install b/debian/maas-agent.install
9new file mode 100644
10index 0000000..71c9e73
11--- /dev/null
12+++ b/debian/maas-agent.install
13@@ -0,0 +1,3 @@
14+debian/extras/99-maas-agent-sudoers etc/sudoers.d
15+
16+bin/maas-agent usr/sbin
17diff --git a/debian/maas-agent.maas-agent.service b/debian/maas-agent.maas-agent.service
18new file mode 100644
19index 0000000..e16fd8a
20--- /dev/null
21+++ b/debian/maas-agent.maas-agent.service
22@@ -0,0 +1,14 @@
23+[Unit]
24+Description=The MAAS Agent daemon
25+Documentation=https://maas.io/docs
26+Wants=network-online.target
27+After=network-online.target
28+BindsTo=maas-rackd.service
29+
30+[Service]
31+User=maas
32+Group=maas
33+ExecStart=/usr/sbin/maas-agent
34+
35+[Install]
36+WantedBy=multi-user.target
37diff --git a/debian/not-installed b/debian/not-installed
38index a136782..4bee65a 100644
39--- a/debian/not-installed
40+++ b/debian/not-installed
41@@ -2,6 +2,3 @@
42 /usr/bin/test.*
43 /usr/bin/maas-sampledata
44
45-# Temporary exclude maas-agent
46-/bin/maas-agent
47-
48diff --git a/debian/rules b/debian/rules
49index ad435b8..61def6d 100755
50--- a/debian/rules
51+++ b/debian/rules
52@@ -20,6 +20,7 @@ override_dh_installsystemd:
53 dh_installsystemd -p maas-proxy --name=maas-proxy maas-proxy.service
54 dh_installsystemd -p maas-dhcp --name=maas-dhcpd maas-dhcpd.service
55 dh_installsystemd -p maas-dhcp --name=maas-dhcpd6 maas-dhcpd6.service
56+ dh_installsystemd -p maas-agent --name=maas-agent maas-agent.service
57
58 override_dh_auto_install:
59 dh_auto_install
60diff --git a/snap/local/tree/bin/run-maas-agent b/snap/local/tree/bin/run-maas-agent
61new file mode 100755
62index 0000000..8c37a57
63--- /dev/null
64+++ b/snap/local/tree/bin/run-maas-agent
65@@ -0,0 +1,6 @@
66+#!/bin/bash
67+
68+# Copyright 2023 Canonical Ltd. This software is licensed under the
69+# GNU Affero General Public License version 3 (see the file LICENSE).
70+
71+exec "$SNAP/usr/sbin/maas-agent"
72diff --git a/snap/local/tree/usr/share/maas/pebble/layers/003-maas-rack-layer.yaml b/snap/local/tree/usr/share/maas/pebble/layers/003-maas-rack-layer.yaml
73index ec97214..82ce9a0 100644
74--- a/snap/local/tree/usr/share/maas/pebble/layers/003-maas-rack-layer.yaml
75+++ b/snap/local/tree/usr/share/maas/pebble/layers/003-maas-rack-layer.yaml
76@@ -28,3 +28,8 @@ services:
77 override: replace
78 startup: disabled
79 command: sh -c "exec $SNAP/bin/run-dhcpd6"
80+
81+ agent:
82+ override: replace
83+ startup: disabled
84+ command: sh -c "exec $SNAP/bin/run-maas-agent"
85diff --git a/snap/snapcraft.yaml b/snap/snapcraft.yaml
86index b2209a0..04e63b1 100644
87--- a/snap/snapcraft.yaml
88+++ b/snap/snapcraft.yaml
89@@ -268,6 +268,16 @@ parts:
90 prime:
91 - usr/sbin/maas-netmon
92
93+ maas-agent:
94+ plugin: make
95+ source: src/maasagent
96+ build-packages:
97+ - golang-go
98+ organize:
99+ bin/maas-agent: usr/sbin/maas-agent
100+ prime:
101+ - usr/sbin/maas-agent
102+
103 tree:
104 plugin: dump
105 source: snap/local/tree
106diff --git a/src/maasagent/cmd/maas-agent/main.go b/src/maasagent/cmd/maas-agent/main.go
107index 7905807..3b4623c 100644
108--- a/src/maasagent/cmd/maas-agent/main.go
109+++ b/src/maasagent/cmd/maas-agent/main.go
110@@ -1,5 +1,69 @@
111 package main
112
113-func main() {
114+/*
115+ Copyright 2023 Canonical Ltd. This software is licensed under the
116+ GNU Affero General Public License version 3 (see the file LICENSE).
117+*/
118+
119+import (
120+ "os"
121+ "os/signal"
122+ "syscall"
123+
124+ "github.com/rs/zerolog"
125+ "github.com/rs/zerolog/log"
126+ "go.temporal.io/sdk/client"
127+ "launchpad.net/maas/maas/src/maasagent/internal/workflow"
128+ wflog "launchpad.net/maas/maas/src/maasagent/internal/workflow/log"
129+)
130+
131+func Run() int {
132+ zerolog.SetGlobalLevel(zerolog.InfoLevel)
133+
134+ log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
135+
136+ if envLogLevel, ok := os.LookupEnv("LOG_LEVEL"); ok {
137+ if logLevel, err := zerolog.ParseLevel(envLogLevel); err != nil {
138+ log.Warn().Str("LOG_LEVEL", envLogLevel).Msg("Unknown log level, defaulting to INFO")
139+ } else {
140+ zerolog.SetGlobalLevel(logLevel)
141+ }
142+ }
143+
144+ // TODO: add contextual fields to the global logger?
145+ log.Info().Msg("Starting MAAS Agent service")
146+
147+ client, err := client.Dial(client.Options{
148+ // TODO: fetch from rack config?
149+ HostPort: "localhost:5271",
150+ Logger: wflog.New(log.Logger),
151+ })
152+
153+ if err != nil {
154+ log.Error().Err(err).Send()
155+ return 1
156+ }
157
158+ // TODO: init Data Encoder
159+ // TODO: read real systemID
160+ _, err = workflow.NewWorkerPool("systemID", client)
161+
162+ if err != nil {
163+ log.Error().Err(err).Send()
164+ return 1
165+ }
166+
167+ log.Info().Msg("Service MAAS Agent started")
168+
169+ sigC := make(chan os.Signal, 2)
170+
171+ signal.Notify(sigC, syscall.SIGTERM, syscall.SIGINT)
172+
173+ <-sigC
174+
175+ return 0
176+}
177+
178+func main() {
179+ os.Exit(Run())
180 }
181diff --git a/src/maasagent/internal/workflow/log/adapter.go b/src/maasagent/internal/workflow/log/adapter.go
182new file mode 100644
183index 0000000..0b16241
184--- /dev/null
185+++ b/src/maasagent/internal/workflow/log/adapter.go
186@@ -0,0 +1,45 @@
187+package log
188+
189+import (
190+ "github.com/rs/zerolog"
191+)
192+
193+// Logger is an adapter that allows usage of zerolog with Temporal Client
194+type Logger struct {
195+ logger zerolog.Logger
196+}
197+
198+// New returns new workflow Logger
199+func New(logger zerolog.Logger) *Logger {
200+ return &Logger{
201+ logger: logger,
202+ }
203+}
204+
205+// Debug implements Temporal log.Logger interface
206+func (l *Logger) Debug(msg string, keyvals ...interface{}) {
207+ sendEvent(l.logger.Debug(), msg, keyvals)
208+}
209+
210+// Info implements Temporal log.Logger interface
211+func (l *Logger) Info(msg string, keyvals ...interface{}) {
212+ sendEvent(l.logger.Info(), msg, keyvals)
213+}
214+
215+// Warn implements Temporal log.Logger interface
216+func (l *Logger) Warn(msg string, keyvals ...interface{}) {
217+ sendEvent(l.logger.Warn(), msg, keyvals)
218+}
219+
220+// Error implements Temporal log.Logger interface
221+func (l *Logger) Error(msg string, keyvals ...interface{}) {
222+ sendEvent(l.logger.Error(), msg, keyvals)
223+}
224+
225+func sendEvent(event *zerolog.Event, msg string, keyvals ...interface{}) {
226+ if len(keyvals) > 0 {
227+ event.Fields(keyvals[0])
228+ }
229+
230+ event.Msg(msg)
231+}
232diff --git a/src/maasagent/internal/workflow/pool.go b/src/maasagent/internal/workflow/pool.go
233new file mode 100644
234index 0000000..bbc6faa
235--- /dev/null
236+++ b/src/maasagent/internal/workflow/pool.go
237@@ -0,0 +1,153 @@
238+package workflow
239+
240+import (
241+ "context"
242+ "fmt"
243+ "sync"
244+ "time"
245+
246+ "go.temporal.io/sdk/client"
247+ "go.temporal.io/sdk/worker"
248+ "go.temporal.io/sdk/workflow"
249+)
250+
251+// WorkerPool contains a collection of Temporal Workers that can be added or
252+// removed in runtime, through master worker that is responsible for execution of
253+// special workflows AddWorker and RemoveWorker.
254+// WorkerPool allows to register specific Workflows and Activities for the
255+// added workers.
256+type WorkerPool struct {
257+ client client.Client
258+ // worker for control plane workflows like Add or Remove workers
259+ master worker.Worker
260+ // collection of workflows allowed for registration
261+ workflows map[string]interface{}
262+ // collection of activities allowed for registration
263+ activities map[string]interface{}
264+ workers map[string]worker.Worker
265+ systemID string
266+ mutex sync.Mutex
267+}
268+
269+// NewWorkerPool returns WorkerPool that has a master worker listening to a
270+// Temporal Task Queue {systemID}
271+func NewWorkerPool(systemID string, client client.Client) (*WorkerPool, error) {
272+ pool := &WorkerPool{
273+ systemID: systemID,
274+ client: client,
275+ workers: make(map[string]worker.Worker),
276+ workflows: map[string]interface{}{
277+ "CheckIP": CheckIP,
278+ },
279+ activities: map[string]interface{}{},
280+ }
281+
282+ // master worker responsible for adding/removing workers from the pool
283+ pool.master = worker.New(client, systemID, worker.Options{})
284+
285+ var opts workflow.RegisterOptions
286+ opts = workflow.RegisterOptions{
287+ Name: "AddWorker",
288+ }
289+ pool.master.RegisterWorkflowWithOptions(
290+ exec[addWorkerParam](pool.addWorker), opts,
291+ )
292+
293+ opts = workflow.RegisterOptions{
294+ Name: "RemoveWorker",
295+ }
296+ pool.master.RegisterWorkflowWithOptions(
297+ exec[removeWorkerParam](pool.removeWorker), opts,
298+ )
299+
300+ return pool, pool.master.Start()
301+}
302+
303+// configureParam is a parameter that should be provided to Configure workflow
304+type configureParam struct {
305+ SystemID string
306+}
307+
308+// Configure calls Configure workflow to be executed.
309+// This workflow will configure WorkerPool with a proper set of workers.
310+// E.g. it will call AddWorker and RemoveWorker workflows.
311+func (p *WorkerPool) Configure(ctx context.Context) error {
312+ workflowOptions := client.StartWorkflowOptions{
313+ TaskQueue: "control-plane",
314+ }
315+
316+ workflowRun, err := p.client.ExecuteWorkflow(ctx, workflowOptions,
317+ "Configure", configureParam{SystemID: p.systemID})
318+ if err != nil {
319+ return err
320+ }
321+
322+ return workflowRun.Get(ctx, nil)
323+}
324+
325+type addWorkerParam struct {
326+ TaskQueue string
327+ Workflows []string
328+ Activities []string
329+}
330+
331+// addWorker adds worker to the WorkerPool and registers workflows and activities
332+func (p *WorkerPool) addWorker(param addWorkerParam) error {
333+ p.mutex.Lock()
334+ defer p.mutex.Unlock()
335+
336+ if _, ok := p.workers[param.TaskQueue]; ok {
337+ return fmt.Errorf("worker for TaskQueue %s is already registered in the pool", param.TaskQueue)
338+ }
339+
340+ w := worker.New(p.client, param.TaskQueue, worker.Options{})
341+
342+ for _, workflow := range param.Workflows {
343+ if fn, ok := p.workflows[workflow]; ok {
344+ w.RegisterWorkflow(fn)
345+ }
346+ }
347+
348+ for _, activity := range param.Activities {
349+ if fn, ok := p.activities[activity]; ok {
350+ w.RegisterActivity(fn)
351+ }
352+ }
353+
354+ if err := w.Start(); err != nil {
355+ return err
356+ }
357+
358+ p.workers[param.TaskQueue] = w
359+
360+ return nil
361+}
362+
363+type removeWorkerParam struct {
364+ TaskQueue string
365+}
366+
367+// removeWorker stops worker of a certain TaskQueue and removes it from the pool
368+func (p *WorkerPool) removeWorker(param removeWorkerParam) error {
369+ if w, ok := p.workers[param.TaskQueue]; ok {
370+ w.Stop()
371+ }
372+
373+ p.mutex.Lock()
374+ delete(p.workers, param.TaskQueue)
375+ defer p.mutex.Unlock()
376+
377+ return nil
378+}
379+
380+// exec will execute provide function as Local Activity
381+func exec[T any](fn any) func(ctx workflow.Context, param T) error {
382+ return func(ctx workflow.Context, param T) error {
383+ lao := workflow.LocalActivityOptions{
384+ ScheduleToCloseTimeout: 5 * time.Second,
385+ }
386+ ctx = workflow.WithLocalActivityOptions(ctx, lao)
387+
388+ return workflow.ExecuteLocalActivity(ctx, fn, param).Get(ctx, nil)
389+ }
390+}
391diff --git a/src/maasserver/models/service.py b/src/maasserver/models/service.py
392index 886ee8c..4680ff9 100644
393--- a/src/maasserver/models/service.py
394+++ b/src/maasserver/models/service.py
395@@ -37,6 +37,7 @@ RACK_SERVICES = frozenset(
396 "dns_rack",
397 "proxy_rack",
398 "syslog_rack",
399+ "agent",
400 }
401 )
402
403@@ -59,6 +60,7 @@ DEAD_STATUSES = {
404 "syslog_rack": SERVICE_STATUS.UNKNOWN,
405 "reverse_proxy": SERVICE_STATUS.UNKNOWN,
406 "temporal": SERVICE_STATUS.UNKNOWN,
407+ "agent": SERVICE_STATUS.UNKNOWN,
408 }
409
410
411diff --git a/src/provisioningserver/rackdservices/external.py b/src/provisioningserver/rackdservices/external.py
412index 4944b02..94b1576 100644
413--- a/src/provisioningserver/rackdservices/external.py
414+++ b/src/provisioningserver/rackdservices/external.py
415@@ -392,6 +392,24 @@ class RackSyslog(RackOnlyExternalService):
416 )
417
418
419+class RackAgent(RackOnlyExternalService):
420+ """External maas-agent service"""
421+
422+ service_name = "agent"
423+
424+ def _configure(self, configuration):
425+ return
426+
427+ def _tryUpdate(self, config):
428+ d = maybeDeferred(
429+ self._getConfiguration,
430+ )
431+ return d
432+
433+ def _getConfiguration(self):
434+ return
435+
436+
437 class RackExternalService(TimerService):
438 # Initial start the interval is low so that forwarders of bind9 gets
439 # at least one region controller. When no region controllers are set
440@@ -417,6 +435,7 @@ class RackExternalService(TimerService):
441 ("DNS", RackDNS()),
442 ("proxy", RackProxy()),
443 ("syslog", RackSyslog()),
444+ ("agent", RackAgent()),
445 ]
446
447 def _update_interval(self, config):
448diff --git a/src/provisioningserver/service_monitor.py b/src/provisioningserver/service_monitor.py
449index 20d7642..de2e115 100644
450--- a/src/provisioningserver/service_monitor.py
451+++ b/src/provisioningserver/service_monitor.py
452@@ -66,6 +66,14 @@ class SyslogServiceOnRack(ToggleableService):
453 snap_service_name = "syslog"
454
455
456+class AgentServiceOnRack(AlwaysOnService):
457+ """Monitored MAAS Agent service on a rack controller host."""
458+
459+ name = "agent"
460+ service_name = "maas-agent"
461+ snap_service_name = "agent"
462+
463+
464 # Global service monitor for rackd. NOTE that changes to this need to be
465 # mirrored in maasserver.model.services.
466 service_monitor = ServiceMonitor(
467@@ -76,4 +84,5 @@ service_monitor = ServiceMonitor(
468 DNSServiceOnRack(),
469 ProxyServiceOnRack(),
470 SyslogServiceOnRack(),
471+ AgentServiceOnRack(),
472 )
473diff --git a/src/provisioningserver/tests/test_service_monitor.py b/src/provisioningserver/tests/test_service_monitor.py
474index 8147d5b..3116eec 100644
475--- a/src/provisioningserver/tests/test_service_monitor.py
476+++ b/src/provisioningserver/tests/test_service_monitor.py
477@@ -6,6 +6,7 @@
478
479 from maastesting.testcase import MAASTestCase
480 from provisioningserver.service_monitor import (
481+ AgentServiceOnRack,
482 DHCPv4Service,
483 DHCPv6Service,
484 DNSServiceOnRack,
485@@ -66,6 +67,14 @@ class TestSyslogServiceOnRack(MAASTestCase):
486 self.assertEqual("syslog_rack", syslog.name)
487
488
489+class TestAgentServiceOnRack(MAASTestCase):
490+ def test_name_and_service_name(self):
491+ syslog = AgentServiceOnRack()
492+ self.assertEqual("maas-agent", syslog.service_name)
493+ self.assertEqual("agent", syslog.snap_service_name)
494+ self.assertEqual("agent", syslog.name)
495+
496+
497 class TestGlobalServiceMonitor(MAASTestCase):
498 def test_includes_all_services(self):
499 self.assertEqual(
500@@ -77,6 +86,7 @@ class TestGlobalServiceMonitor(MAASTestCase):
501 "ntp_rack",
502 "proxy_rack",
503 "syslog_rack",
504+ "agent",
505 },
506 service_monitor._services.keys(),
507 )

Subscribers

People subscribed via source and target branches