Merge ~troyanov/maas:refactor-workerpool into maas:master

Proposed by Anton Troyanov
Status: Merged
Approved by: Anton Troyanov
Approved revision: e6579d65203fd664e6f2f0a49f1f9823d548df99
Merge reported by: MAAS Lander
Merged at revision: not available
Proposed branch: ~troyanov/maas:refactor-workerpool
Merge into: maas:master
Diff against target: 347 lines (+185/-62)
2 files modified
src/maasagent/cmd/maas-agent/main.go (+26/-10)
src/maasagent/internal/workflow/worker/pool.go (+159/-52)
Reviewer Review Type Date Requested Status
MAAS Lander Approve
Christian Grabowski Approve
Review via email: mp+449118@code.launchpad.net

Commit message

refactor: opts pattern to override pool defaults

To post a comment you must log in.
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b refactor-workerpool lp:~troyanov/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: SUCCESS
COMMIT: b283dfb3bfc9758842e6c65026a9b7588a675ec3

review: Approve
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b refactor-workerpool lp:~troyanov/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: SUCCESS
COMMIT: dd7bb0c5a0d8f87e5189156bacfc43fceea276e1

review: Approve
~troyanov/maas:refactor-workerpool updated
08f30b5... by Anton Troyanov

refactor: remove repetitive things

Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b refactor-workerpool lp:~troyanov/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: SUCCESS
COMMIT: 53f6871aa41b82cd5c16905c728235b924c14f9d

review: Approve
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b refactor-workerpool lp:~troyanov/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: SUCCESS
COMMIT: 51fbefb67768125ac5b6015c9cc3fe7ea3a1addb

review: Approve
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b refactor-workerpool lp:~troyanov/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: FAILED
LOG: http://maas-ci.internal:8080/job/maas-tester/3362/console
COMMIT: 54136ffb92b13109c6b23a143efbd63374ce2aff

review: Needs Fixing
~troyanov/maas:refactor-workerpool updated
a8b0c1d... by Anton Troyanov

refactor: opts pattern to override pool defaults

1f0c468... by Anton Troyanov

refactor(temporal): explicit handle client fatal

Revision history for this message
Christian Grabowski (cgrabowski) wrote :

One minor nit inline, otherwise, lgtm.

review: Approve
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b refactor-workerpool lp:~troyanov/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: SUCCESS
COMMIT: fef5fdc3e23c06348c292758b105e3d9f07c348c

review: Approve
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b refactor-workerpool lp:~troyanov/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: FAILED
LOG: http://maas-ci.internal:8080/job/maas-tester/3364/console
COMMIT: b6f0afb70385efd11ab45aa1730546b959f00029

review: Needs Fixing
~troyanov/maas:refactor-workerpool updated
2f3ca5a... by Anton Troyanov

chore: add comments

Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b refactor-workerpool lp:~troyanov/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: FAILED
LOG: http://maas-ci.internal:8080/job/maas-tester/3365/console
COMMIT: 2492ed624f3cb16f825c10f4ee87f01e53aff38b

review: Needs Fixing
~troyanov/maas:refactor-workerpool updated
e6579d6... by Anton Troyanov

fixup! refactor(temporal): explicit handle client fatal

Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b refactor-workerpool lp:~troyanov/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: SUCCESS
COMMIT: e6579d65203fd664e6f2f0a49f1f9823d548df99

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/src/maasagent/cmd/maas-agent/main.go b/src/maasagent/cmd/maas-agent/main.go
2index 6336006..545272f 100644
3--- a/src/maasagent/cmd/maas-agent/main.go
4+++ b/src/maasagent/cmd/maas-agent/main.go
5@@ -19,6 +19,7 @@ import (
6 "go.temporal.io/sdk/client"
7 "go.temporal.io/sdk/converter"
8 "gopkg.in/yaml.v3"
9+ wf "maas.io/core/src/maasagent/internal/workflow"
10 wflog "maas.io/core/src/maasagent/internal/workflow/log"
11 "maas.io/core/src/maasagent/internal/workflow/worker"
12 "maas.io/core/src/maasagent/pkg/workflow/codec"
13@@ -84,17 +85,28 @@ func Run() int {
14 return 1
15 }
16
17+ workerPool := worker.NewWorkerPool(cfg.SystemID, client,
18+ worker.WithAllowedWorkflows(map[string]interface{}{
19+ "check_ip": wf.CheckIP,
20+ "commission": wf.Commission,
21+ "deploy": wf.Deploy,
22+ "deployed_os_workflow": wf.DeployedOS,
23+ "ephemeral_os_workflow": wf.EphemeralOS,
24+ "power_on": wf.PowerOn,
25+ "power_off": wf.PowerOff,
26+ "power_query": wf.PowerQuery,
27+ "power_cycle": wf.PowerCycle,
28+ }), worker.WithAllowedActivities(map[string]interface{}{
29+ "switch_boot_order": wf.SwitchBootOrderActivity,
30+ "power": wf.PowerActivity,
31+ }))
32+
33 workerPoolBackoff := backoff.NewExponentialBackOff()
34 workerPoolBackoff.MaxElapsedTime = 60 * time.Second
35
36- _, err = backoff.RetryWithData(
37- func() (*worker.WorkerPool, error) {
38- return worker.NewWorkerPool(cfg.SystemID, client)
39- }, workerPoolBackoff,
40- )
41-
42+ err = backoff.Retry(workerPool.Start, workerPoolBackoff)
43 if err != nil {
44- log.Error().Err(err).Msg("Temporal worker pool creation failure")
45+ log.Error().Err(err).Msg("Temporal worker pool failure")
46 return 1
47 }
48
49@@ -104,9 +116,13 @@ func Run() int {
50
51 signal.Notify(sigC, syscall.SIGTERM, syscall.SIGINT)
52
53- <-sigC
54-
55- return 0
56+ select {
57+ case err := <-workerPool.Error():
58+ log.Fatal().Err(err).Msg("Temporal worker pool failure")
59+ return 1
60+ case <-sigC:
61+ return 0
62+ }
63 }
64
65 // getConfig reads MAAS Agent YAML configuration file
66diff --git a/src/maasagent/internal/workflow/worker/pool.go b/src/maasagent/internal/workflow/worker/pool.go
67index a342e26..05f91ef 100644
68--- a/src/maasagent/internal/workflow/worker/pool.go
69+++ b/src/maasagent/internal/workflow/worker/pool.go
70@@ -8,71 +8,134 @@ import (
71
72 "go.temporal.io/sdk/activity"
73 "go.temporal.io/sdk/client"
74+ "go.temporal.io/sdk/temporal"
75 "go.temporal.io/sdk/worker"
76 "go.temporal.io/sdk/workflow"
77+)
78
79- wf "maas.io/core/src/maasagent/internal/workflow"
80+const (
81+ defaultAddWorkerWorkflowName = "add_worker"
82+ defaultRemoveWorkerWorkflowName = "remove_worker"
83+ defaultConfigurePoolWorkflowName = "configure_worker_pool"
84+ defaultControlPlaneTaskQueueName = "control_plane"
85 )
86
87 // WorkerPool contains a collection of Temporal Workers that can be added or
88 // removed during runtime by master worker which is responsible for execution of
89 // special workflows `add_worker` and `remove_worker`.
90 type WorkerPool struct {
91+ fatal chan error
92 client client.Client
93 // worker for control plane workflows like Add or Remove workers
94- master worker.Worker
95- // collection of workflows allowed for registration
96- workflows map[string]interface{}
97- // collection of activities allowed for registration
98- activities map[string]interface{}
99- workers map[string]worker.Worker
100- systemID string
101- mutex sync.Mutex
102+ master worker.Worker
103+ workers map[string]worker.Worker
104+ allowedWorkflows map[string]interface{}
105+ allowedActivities map[string]interface{}
106+ systemID string
107+ addWorkerWorkflowName string
108+ removeWorkerWorkflowName string
109+ configurePoolWorkflowName string
110+ controlPlaneTaskQueueName string
111+ mutex sync.Mutex
112 }
113
114 // NewWorkerPool returns WorkerPool that has a master worker listening to a
115 // Temporal Task Queue named after systemID
116-func NewWorkerPool(systemID string, client client.Client) (*WorkerPool, error) {
117+func NewWorkerPool(systemID string, client client.Client,
118+ options ...WorkerPoolOption) *WorkerPool {
119 pool := &WorkerPool{
120- systemID: systemID,
121- client: client,
122- workers: make(map[string]worker.Worker),
123- workflows: map[string]interface{}{
124- "check_ip": wf.CheckIP,
125- "commission": wf.Commission,
126- "deploy": wf.Deploy,
127- "deployed_os_workflow": wf.DeployedOS,
128- "ephemeral_os_workflow": wf.EphemeralOS,
129- "power_on": wf.PowerOn,
130- "power_off": wf.PowerOff,
131- "power_query": wf.PowerQuery,
132- "power_cycle": wf.PowerCycle,
133- },
134- activities: map[string]interface{}{
135- "switch_boot_order": wf.SwitchBootOrderActivity,
136- "power": wf.PowerActivity,
137- },
138+ systemID: systemID,
139+ client: client,
140+ workers: make(map[string]worker.Worker),
141+ addWorkerWorkflowName: defaultAddWorkerWorkflowName,
142+ removeWorkerWorkflowName: defaultRemoveWorkerWorkflowName,
143+ configurePoolWorkflowName: defaultConfigurePoolWorkflowName,
144+ controlPlaneTaskQueueName: defaultControlPlaneTaskQueueName,
145+ }
146+
147+ for _, opt := range options {
148+ opt(pool)
149 }
150
151 // master worker is responsible for adding/removing workers to/from the pool
152- pool.master = worker.New(client, systemID, worker.Options{})
153+ pool.master = worker.New(client, systemID, worker.Options{
154+ Identity: fmt.Sprintf("%s:master", systemID),
155+ OnFatalError: func(err error) { pool.fatal <- err },
156+ })
157
158- var opts workflow.RegisterOptions
159- opts = workflow.RegisterOptions{
160- Name: "add_worker",
161- }
162 pool.master.RegisterWorkflowWithOptions(
163- localActivityExec[addWorkerParam](pool.addWorker), opts,
164+ localActivityExec[addWorkerParam](pool.addWorker),
165+ workflow.RegisterOptions{
166+ Name: pool.addWorkerWorkflowName,
167+ },
168 )
169
170- opts = workflow.RegisterOptions{
171- Name: "remove_worker",
172- }
173 pool.master.RegisterWorkflowWithOptions(
174- localActivityExec[removeWorkerParam](pool.removeWorker), opts,
175+ localActivityExec[removeWorkerParam](pool.removeWorker),
176+ workflow.RegisterOptions{
177+ Name: pool.removeWorkerWorkflowName,
178+ },
179 )
180
181- return pool, pool.master.Start()
182+ return pool
183+}
184+
185+// Start starts the master worker process that controls worker pool
186+func (p *WorkerPool) Start() error {
187+ return p.master.Start()
188+}
189+
190+func (p *WorkerPool) Error() chan error {
191+ return p.fatal
192+}
193+
194+// WorkerPoolOption allows to set additional WorkerPool options
195+type WorkerPoolOption func(*WorkerPool)
196+
197+// WithAddWorkerWorkflowName sets custom addWorkerWorkflowName
198+// (default: "add_worker")
199+func WithAddWorkerWorkflowName(s string) WorkerPoolOption {
200+ return func(p *WorkerPool) {
201+ p.addWorkerWorkflowName = s
202+ }
203+}
204+
205+// WithRemoveWorkerWorkflowName sets custom removeWorkerWorkflowName
206+// (default: "remove_worker")
207+func WithRemoveWorkerWorkflowName(s string) WorkerPoolOption {
208+ return func(p *WorkerPool) {
209+ p.removeWorkerWorkflowName = s
210+ }
211+}
212+
213+// WithConfigurePoolWorkflowName sets custom configurePoolWorkflowName
214+// (default: "configure_worker_pool")
215+func WithConfigurePoolWorkflowName(s string) WorkerPoolOption {
216+ return func(p *WorkerPool) {
217+ p.configurePoolWorkflowName = s
218+ }
219+}
220+
221+// WithControlPlaneTaskQueueName sets custom controlPlaneTaskQueueName
222+// (default: "control_plane")
223+func WithControlPlaneTaskQueueName(s string) WorkerPoolOption {
224+ return func(p *WorkerPool) {
225+ p.controlPlaneTaskQueueName = s
226+ }
227+}
228+
229+// WithAllowedWorkflows sets workflows allowed to be registered
230+func WithAllowedWorkflows(workflows map[string]interface{}) WorkerPoolOption {
231+ return func(p *WorkerPool) {
232+ p.allowedWorkflows = workflows
233+ }
234+}
235+
236+// WithAllowedActivities sets activities allowed to be registered
237+func WithAllowedActivities(activities map[string]interface{}) WorkerPoolOption {
238+ return func(p *WorkerPool) {
239+ p.allowedActivities = activities
240+ }
241 }
242
243 // configureWorkerPoolParam is a parameter that should be provided to the
244@@ -109,25 +172,32 @@ func (p *WorkerPool) addWorker(param addWorkerParam) error {
245 defer p.mutex.Unlock()
246
247 if _, ok := p.workers[param.TaskQueue]; ok {
248- return fmt.Errorf("worker for TaskQueue %s is already registered in the pool", param.TaskQueue)
249+ return failedToAddWorkerError(param.TaskQueue)
250 }
251
252- w := worker.New(p.client, param.TaskQueue, worker.Options{})
253+ w := worker.New(p.client, param.TaskQueue, worker.Options{
254+ Identity: fmt.Sprintf("%s:%s", p.systemID, param.TaskQueue),
255+ OnFatalError: func(err error) { p.fatal <- err },
256+ })
257
258- for _, wf := range param.Workflows {
259- if fn, ok := p.workflows[wf]; ok {
260- w.RegisterWorkflowWithOptions(fn, workflow.RegisterOptions{Name: wf})
261- }
262+ if err := register("workflow", param.Workflows, p.allowedWorkflows,
263+ func(name string, fn interface{}) {
264+ w.RegisterWorkflowWithOptions(fn, workflow.RegisterOptions{Name: name})
265+ }); err != nil {
266+ return err
267 }
268
269- for _, act := range param.Activities {
270- if fn, ok := p.activities[act]; ok {
271- w.RegisterActivityWithOptions(fn, activity.RegisterOptions{Name: act})
272- }
273+ if err := register("activity", param.Activities, p.allowedActivities,
274+ func(name string, fn interface{}) {
275+ w.RegisterActivityWithOptions(fn, activity.RegisterOptions{Name: name})
276+ }); err != nil {
277+ return err
278 }
279
280 if err := w.Start(); err != nil {
281- return err
282+ w = nil
283+
284+ return failedToStartWorkerError(err)
285 }
286
287 p.workers[param.TaskQueue] = w
288@@ -135,6 +205,24 @@ func (p *WorkerPool) addWorker(param addWorkerParam) error {
289 return nil
290 }
291
292+func register(t string, s []string, allowed map[string]interface{},
293+ reg func(string, interface{})) error {
294+ for _, val := range s {
295+ fn, ok := allowed[val]
296+ if ok {
297+ reg(val, fn)
298+ continue
299+ }
300+
301+ return temporal.NewNonRetryableApplicationError(
302+ fmt.Sprintf("Failed registering %s", t),
303+ fmt.Sprintf("%sNotAllowed", t),
304+ fmt.Errorf("%s %q is not allowed", t, val))
305+ }
306+
307+ return nil
308+}
309+
310 type removeWorkerParam struct {
311 TaskQueue string `json:"task_queue"`
312 }
313@@ -145,9 +233,8 @@ func (p *WorkerPool) removeWorker(param removeWorkerParam) error {
314 defer p.mutex.Unlock()
315
316 w, ok := p.workers[param.TaskQueue]
317-
318 if !ok {
319- return nil
320+ return failedToRemoveWorkerError(param.TaskQueue)
321 }
322
323 w.Stop()
324@@ -167,3 +254,23 @@ func localActivityExec[T any](fn any) func(ctx workflow.Context, param T) error
325 return workflow.ExecuteLocalActivity(ctx, fn, param).Get(ctx, nil)
326 }
327 }
328+
329+// failedToAddWorkerError returns a non retryable error
330+func failedToAddWorkerError(taskQueue string) error {
331+ return temporal.NewNonRetryableApplicationError("Failed adding worker",
332+ "failedToAddWorker",
333+ fmt.Errorf("worker for task queue %q already exists", taskQueue))
334+}
335+
336+// failedToRemoveWorkerError returns a non retryable error
337+func failedToRemoveWorkerError(taskQueue string) error {
338+ return temporal.NewNonRetryableApplicationError("Failed removing worker",
339+ "failedToRemoveWorker",
340+ fmt.Errorf("worker for task queue %q doesn't exist", taskQueue))
341+}
342+
343+// failedToStartWorkerError returns a non retryable error
344+func failedToStartWorkerError(err error) error {
345+ return temporal.NewNonRetryableApplicationError("Failed to start worker",
346+ "failedToStartWorker", err)
347+}

Subscribers

People subscribed via source and target branches