Merge ~troyanov/maas:refactor-workerpool into maas:master
- Git
- lp:~troyanov/maas
- refactor-workerpool
- Merge into master
Status: | Merged |
---|---|
Approved by: | Anton Troyanov |
Approved revision: | e6579d65203fd664e6f2f0a49f1f9823d548df99 |
Merge reported by: | MAAS Lander |
Merged at revision: | not available |
Proposed branch: | ~troyanov/maas:refactor-workerpool |
Merge into: | maas:master |
Diff against target: |
347 lines (+185/-62) 2 files modified
src/maasagent/cmd/maas-agent/main.go (+26/-10) src/maasagent/internal/workflow/worker/pool.go (+159/-52) |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
MAAS Lander | Approve | ||
Christian Grabowski | Approve | ||
Review via email: mp+449118@code.launchpad.net |
Commit message
refactor: opts pattern to override pool defaults
Description of the change
MAAS Lander (maas-lander) wrote : | # |
UNIT TESTS
-b refactor-workerpool lp:~troyanov/maas/+git/maas into -b master lp:~maas-committers/maas
STATUS: SUCCESS
COMMIT: dd7bb0c5a0d8f87
- 08f30b5... by Anton Troyanov
-
refactor: remove repetitive things
MAAS Lander (maas-lander) wrote : | # |
UNIT TESTS
-b refactor-workerpool lp:~troyanov/maas/+git/maas into -b master lp:~maas-committers/maas
STATUS: SUCCESS
COMMIT: 53f6871aa41b82c
MAAS Lander (maas-lander) wrote : | # |
UNIT TESTS
-b refactor-workerpool lp:~troyanov/maas/+git/maas into -b master lp:~maas-committers/maas
STATUS: SUCCESS
COMMIT: 51fbefb67768125
MAAS Lander (maas-lander) wrote : | # |
UNIT TESTS
-b refactor-workerpool lp:~troyanov/maas/+git/maas into -b master lp:~maas-committers/maas
STATUS: FAILED
LOG: http://
COMMIT: 54136ffb92b1310
- a8b0c1d... by Anton Troyanov
-
refactor: opts pattern to override pool defaults
- 1f0c468... by Anton Troyanov
-
refactor(temporal): explicit handle client fatal
Christian Grabowski (cgrabowski) wrote : | # |
One minor nit inline, otherwise, lgtm.
MAAS Lander (maas-lander) wrote : | # |
UNIT TESTS
-b refactor-workerpool lp:~troyanov/maas/+git/maas into -b master lp:~maas-committers/maas
STATUS: SUCCESS
COMMIT: fef5fdc3e23c063
MAAS Lander (maas-lander) wrote : | # |
UNIT TESTS
-b refactor-workerpool lp:~troyanov/maas/+git/maas into -b master lp:~maas-committers/maas
STATUS: FAILED
LOG: http://
COMMIT: b6f0afb70385efd
- 2f3ca5a... by Anton Troyanov
-
chore: add comments
MAAS Lander (maas-lander) wrote : | # |
UNIT TESTS
-b refactor-workerpool lp:~troyanov/maas/+git/maas into -b master lp:~maas-committers/maas
STATUS: FAILED
LOG: http://
COMMIT: 2492ed624f3cb16
- e6579d6... by Anton Troyanov
-
fixup! refactor(temporal): explicit handle client fatal
MAAS Lander (maas-lander) wrote : | # |
UNIT TESTS
-b refactor-workerpool lp:~troyanov/maas/+git/maas into -b master lp:~maas-committers/maas
STATUS: SUCCESS
COMMIT: e6579d65203fd66
Preview Diff
1 | diff --git a/src/maasagent/cmd/maas-agent/main.go b/src/maasagent/cmd/maas-agent/main.go |
2 | index 6336006..545272f 100644 |
3 | --- a/src/maasagent/cmd/maas-agent/main.go |
4 | +++ b/src/maasagent/cmd/maas-agent/main.go |
5 | @@ -19,6 +19,7 @@ import ( |
6 | "go.temporal.io/sdk/client" |
7 | "go.temporal.io/sdk/converter" |
8 | "gopkg.in/yaml.v3" |
9 | + wf "maas.io/core/src/maasagent/internal/workflow" |
10 | wflog "maas.io/core/src/maasagent/internal/workflow/log" |
11 | "maas.io/core/src/maasagent/internal/workflow/worker" |
12 | "maas.io/core/src/maasagent/pkg/workflow/codec" |
13 | @@ -84,17 +85,28 @@ func Run() int { |
14 | return 1 |
15 | } |
16 | |
17 | + workerPool := worker.NewWorkerPool(cfg.SystemID, client, |
18 | + worker.WithAllowedWorkflows(map[string]interface{}{ |
19 | + "check_ip": wf.CheckIP, |
20 | + "commission": wf.Commission, |
21 | + "deploy": wf.Deploy, |
22 | + "deployed_os_workflow": wf.DeployedOS, |
23 | + "ephemeral_os_workflow": wf.EphemeralOS, |
24 | + "power_on": wf.PowerOn, |
25 | + "power_off": wf.PowerOff, |
26 | + "power_query": wf.PowerQuery, |
27 | + "power_cycle": wf.PowerCycle, |
28 | + }), worker.WithAllowedActivities(map[string]interface{}{ |
29 | + "switch_boot_order": wf.SwitchBootOrderActivity, |
30 | + "power": wf.PowerActivity, |
31 | + })) |
32 | + |
33 | workerPoolBackoff := backoff.NewExponentialBackOff() |
34 | workerPoolBackoff.MaxElapsedTime = 60 * time.Second |
35 | |
36 | - _, err = backoff.RetryWithData( |
37 | - func() (*worker.WorkerPool, error) { |
38 | - return worker.NewWorkerPool(cfg.SystemID, client) |
39 | - }, workerPoolBackoff, |
40 | - ) |
41 | - |
42 | + err = backoff.Retry(workerPool.Start, workerPoolBackoff) |
43 | if err != nil { |
44 | - log.Error().Err(err).Msg("Temporal worker pool creation failure") |
45 | + log.Error().Err(err).Msg("Temporal worker pool failure") |
46 | return 1 |
47 | } |
48 | |
49 | @@ -104,9 +116,13 @@ func Run() int { |
50 | |
51 | signal.Notify(sigC, syscall.SIGTERM, syscall.SIGINT) |
52 | |
53 | - <-sigC |
54 | - |
55 | - return 0 |
56 | + select { |
57 | + case err := <-workerPool.Error(): |
58 | + log.Fatal().Err(err).Msg("Temporal worker pool failure") |
59 | + return 1 |
60 | + case <-sigC: |
61 | + return 0 |
62 | + } |
63 | } |
64 | |
65 | // getConfig reads MAAS Agent YAML configuration file |
66 | diff --git a/src/maasagent/internal/workflow/worker/pool.go b/src/maasagent/internal/workflow/worker/pool.go |
67 | index a342e26..05f91ef 100644 |
68 | --- a/src/maasagent/internal/workflow/worker/pool.go |
69 | +++ b/src/maasagent/internal/workflow/worker/pool.go |
70 | @@ -8,71 +8,134 @@ import ( |
71 | |
72 | "go.temporal.io/sdk/activity" |
73 | "go.temporal.io/sdk/client" |
74 | + "go.temporal.io/sdk/temporal" |
75 | "go.temporal.io/sdk/worker" |
76 | "go.temporal.io/sdk/workflow" |
77 | +) |
78 | |
79 | - wf "maas.io/core/src/maasagent/internal/workflow" |
80 | +const ( |
81 | + defaultAddWorkerWorkflowName = "add_worker" |
82 | + defaultRemoveWorkerWorkflowName = "remove_worker" |
83 | + defaultConfigurePoolWorkflowName = "configure_worker_pool" |
84 | + defaultControlPlaneTaskQueueName = "control_plane" |
85 | ) |
86 | |
87 | // WorkerPool contains a collection of Temporal Workers that can be added or |
88 | // removed during runtime by master worker which is responsible for execution of |
89 | // special workflows `add_worker` and `remove_worker`. |
90 | type WorkerPool struct { |
91 | + fatal chan error |
92 | client client.Client |
93 | // worker for control plane workflows like Add or Remove workers |
94 | - master worker.Worker |
95 | - // collection of workflows allowed for registration |
96 | - workflows map[string]interface{} |
97 | - // collection of activities allowed for registration |
98 | - activities map[string]interface{} |
99 | - workers map[string]worker.Worker |
100 | - systemID string |
101 | - mutex sync.Mutex |
102 | + master worker.Worker |
103 | + workers map[string]worker.Worker |
104 | + allowedWorkflows map[string]interface{} |
105 | + allowedActivities map[string]interface{} |
106 | + systemID string |
107 | + addWorkerWorkflowName string |
108 | + removeWorkerWorkflowName string |
109 | + configurePoolWorkflowName string |
110 | + controlPlaneTaskQueueName string |
111 | + mutex sync.Mutex |
112 | } |
113 | |
114 | // NewWorkerPool returns WorkerPool that has a master worker listening to a |
115 | // Temporal Task Queue named after systemID |
116 | -func NewWorkerPool(systemID string, client client.Client) (*WorkerPool, error) { |
117 | +func NewWorkerPool(systemID string, client client.Client, |
118 | + options ...WorkerPoolOption) *WorkerPool { |
119 | pool := &WorkerPool{ |
120 | - systemID: systemID, |
121 | - client: client, |
122 | - workers: make(map[string]worker.Worker), |
123 | - workflows: map[string]interface{}{ |
124 | - "check_ip": wf.CheckIP, |
125 | - "commission": wf.Commission, |
126 | - "deploy": wf.Deploy, |
127 | - "deployed_os_workflow": wf.DeployedOS, |
128 | - "ephemeral_os_workflow": wf.EphemeralOS, |
129 | - "power_on": wf.PowerOn, |
130 | - "power_off": wf.PowerOff, |
131 | - "power_query": wf.PowerQuery, |
132 | - "power_cycle": wf.PowerCycle, |
133 | - }, |
134 | - activities: map[string]interface{}{ |
135 | - "switch_boot_order": wf.SwitchBootOrderActivity, |
136 | - "power": wf.PowerActivity, |
137 | - }, |
138 | + systemID: systemID, |
139 | + client: client, |
140 | + workers: make(map[string]worker.Worker), |
141 | + addWorkerWorkflowName: defaultAddWorkerWorkflowName, |
142 | + removeWorkerWorkflowName: defaultRemoveWorkerWorkflowName, |
143 | + configurePoolWorkflowName: defaultConfigurePoolWorkflowName, |
144 | + controlPlaneTaskQueueName: defaultControlPlaneTaskQueueName, |
145 | + } |
146 | + |
147 | + for _, opt := range options { |
148 | + opt(pool) |
149 | } |
150 | |
151 | // master worker is responsible for adding/removing workers to/from the pool |
152 | - pool.master = worker.New(client, systemID, worker.Options{}) |
153 | + pool.master = worker.New(client, systemID, worker.Options{ |
154 | + Identity: fmt.Sprintf("%s:master", systemID), |
155 | + OnFatalError: func(err error) { pool.fatal <- err }, |
156 | + }) |
157 | |
158 | - var opts workflow.RegisterOptions |
159 | - opts = workflow.RegisterOptions{ |
160 | - Name: "add_worker", |
161 | - } |
162 | pool.master.RegisterWorkflowWithOptions( |
163 | - localActivityExec[addWorkerParam](pool.addWorker), opts, |
164 | + localActivityExec[addWorkerParam](pool.addWorker), |
165 | + workflow.RegisterOptions{ |
166 | + Name: pool.addWorkerWorkflowName, |
167 | + }, |
168 | ) |
169 | |
170 | - opts = workflow.RegisterOptions{ |
171 | - Name: "remove_worker", |
172 | - } |
173 | pool.master.RegisterWorkflowWithOptions( |
174 | - localActivityExec[removeWorkerParam](pool.removeWorker), opts, |
175 | + localActivityExec[removeWorkerParam](pool.removeWorker), |
176 | + workflow.RegisterOptions{ |
177 | + Name: pool.removeWorkerWorkflowName, |
178 | + }, |
179 | ) |
180 | |
181 | - return pool, pool.master.Start() |
182 | + return pool |
183 | +} |
184 | + |
185 | +// Start starts the master worker process that controls worker pool |
186 | +func (p *WorkerPool) Start() error { |
187 | + return p.master.Start() |
188 | +} |
189 | + |
190 | +func (p *WorkerPool) Error() chan error { |
191 | + return p.fatal |
192 | +} |
193 | + |
194 | +// WorkerPoolOption allows to set additional WorkerPool options |
195 | +type WorkerPoolOption func(*WorkerPool) |
196 | + |
197 | +// WithAddWorkerWorkflowName sets custom addWorkerWorkflowName |
198 | +// (default: "add_worker") |
199 | +func WithAddWorkerWorkflowName(s string) WorkerPoolOption { |
200 | + return func(p *WorkerPool) { |
201 | + p.addWorkerWorkflowName = s |
202 | + } |
203 | +} |
204 | + |
205 | +// WithRemoveWorkerWorkflowName sets custom removeWorkerWorkflowName |
206 | +// (default: "remove_worker") |
207 | +func WithRemoveWorkerWorkflowName(s string) WorkerPoolOption { |
208 | + return func(p *WorkerPool) { |
209 | + p.removeWorkerWorkflowName = s |
210 | + } |
211 | +} |
212 | + |
213 | +// WithConfigurePoolWorkflowName sets custom configurePoolWorkflowName |
214 | +// (default: "configure_worker_pool") |
215 | +func WithConfigurePoolWorkflowName(s string) WorkerPoolOption { |
216 | + return func(p *WorkerPool) { |
217 | + p.configurePoolWorkflowName = s |
218 | + } |
219 | +} |
220 | + |
221 | +// WithControlPlaneTaskQueueName sets custom controlPlaneTaskQueueName |
222 | +// (default: "control_plane") |
223 | +func WithControlPlaneTaskQueueName(s string) WorkerPoolOption { |
224 | + return func(p *WorkerPool) { |
225 | + p.controlPlaneTaskQueueName = s |
226 | + } |
227 | +} |
228 | + |
229 | +// WithAllowedWorkflows sets workflows allowed to be registered |
230 | +func WithAllowedWorkflows(workflows map[string]interface{}) WorkerPoolOption { |
231 | + return func(p *WorkerPool) { |
232 | + p.allowedWorkflows = workflows |
233 | + } |
234 | +} |
235 | + |
236 | +// WithAllowedActivities sets activities allowed to be registered |
237 | +func WithAllowedActivities(activities map[string]interface{}) WorkerPoolOption { |
238 | + return func(p *WorkerPool) { |
239 | + p.allowedActivities = activities |
240 | + } |
241 | } |
242 | |
243 | // configureWorkerPoolParam is a parameter that should be provided to the |
244 | @@ -109,25 +172,32 @@ func (p *WorkerPool) addWorker(param addWorkerParam) error { |
245 | defer p.mutex.Unlock() |
246 | |
247 | if _, ok := p.workers[param.TaskQueue]; ok { |
248 | - return fmt.Errorf("worker for TaskQueue %s is already registered in the pool", param.TaskQueue) |
249 | + return failedToAddWorkerError(param.TaskQueue) |
250 | } |
251 | |
252 | - w := worker.New(p.client, param.TaskQueue, worker.Options{}) |
253 | + w := worker.New(p.client, param.TaskQueue, worker.Options{ |
254 | + Identity: fmt.Sprintf("%s:%s", p.systemID, param.TaskQueue), |
255 | + OnFatalError: func(err error) { p.fatal <- err }, |
256 | + }) |
257 | |
258 | - for _, wf := range param.Workflows { |
259 | - if fn, ok := p.workflows[wf]; ok { |
260 | - w.RegisterWorkflowWithOptions(fn, workflow.RegisterOptions{Name: wf}) |
261 | - } |
262 | + if err := register("workflow", param.Workflows, p.allowedWorkflows, |
263 | + func(name string, fn interface{}) { |
264 | + w.RegisterWorkflowWithOptions(fn, workflow.RegisterOptions{Name: name}) |
265 | + }); err != nil { |
266 | + return err |
267 | } |
268 | |
269 | - for _, act := range param.Activities { |
270 | - if fn, ok := p.activities[act]; ok { |
271 | - w.RegisterActivityWithOptions(fn, activity.RegisterOptions{Name: act}) |
272 | - } |
273 | + if err := register("activity", param.Activities, p.allowedActivities, |
274 | + func(name string, fn interface{}) { |
275 | + w.RegisterActivityWithOptions(fn, activity.RegisterOptions{Name: name}) |
276 | + }); err != nil { |
277 | + return err |
278 | } |
279 | |
280 | if err := w.Start(); err != nil { |
281 | - return err |
282 | + w = nil |
283 | + |
284 | + return failedToStartWorkerError(err) |
285 | } |
286 | |
287 | p.workers[param.TaskQueue] = w |
288 | @@ -135,6 +205,24 @@ func (p *WorkerPool) addWorker(param addWorkerParam) error { |
289 | return nil |
290 | } |
291 | |
292 | +func register(t string, s []string, allowed map[string]interface{}, |
293 | + reg func(string, interface{})) error { |
294 | + for _, val := range s { |
295 | + fn, ok := allowed[val] |
296 | + if ok { |
297 | + reg(val, fn) |
298 | + continue |
299 | + } |
300 | + |
301 | + return temporal.NewNonRetryableApplicationError( |
302 | + fmt.Sprintf("Failed registering %s", t), |
303 | + fmt.Sprintf("%sNotAllowed", t), |
304 | + fmt.Errorf("%s %q is not allowed", t, val)) |
305 | + } |
306 | + |
307 | + return nil |
308 | +} |
309 | + |
310 | type removeWorkerParam struct { |
311 | TaskQueue string `json:"task_queue"` |
312 | } |
313 | @@ -145,9 +233,8 @@ func (p *WorkerPool) removeWorker(param removeWorkerParam) error { |
314 | defer p.mutex.Unlock() |
315 | |
316 | w, ok := p.workers[param.TaskQueue] |
317 | - |
318 | if !ok { |
319 | - return nil |
320 | + return failedToRemoveWorkerError(param.TaskQueue) |
321 | } |
322 | |
323 | w.Stop() |
324 | @@ -167,3 +254,23 @@ func localActivityExec[T any](fn any) func(ctx workflow.Context, param T) error |
325 | return workflow.ExecuteLocalActivity(ctx, fn, param).Get(ctx, nil) |
326 | } |
327 | } |
328 | + |
329 | +// failedToAddWorkerError returns a non retryable error |
330 | +func failedToAddWorkerError(taskQueue string) error { |
331 | + return temporal.NewNonRetryableApplicationError("Failed adding worker", |
332 | + "failedToAddWorker", |
333 | + fmt.Errorf("worker for task queue %q already exists", taskQueue)) |
334 | +} |
335 | + |
336 | +// failedToRemoveWorkerError returns a non retryable error |
337 | +func failedToRemoveWorkerError(taskQueue string) error { |
338 | + return temporal.NewNonRetryableApplicationError("Failed removing worker", |
339 | + "failedToRemoveWorker", |
340 | + fmt.Errorf("worker for task queue %q doesn't exist", taskQueue)) |
341 | +} |
342 | + |
343 | +// failedToStartWorkerError returns a non retryable error |
344 | +func failedToStartWorkerError(err error) error { |
345 | + return temporal.NewNonRetryableApplicationError("Failed to start worker", |
346 | + "failedToStartWorker", err) |
347 | +} |
UNIT TESTS
-b refactor-workerpool lp:~troyanov/maas/+git/maas into -b master lp:~maas-committers/maas
STATUS: SUCCESS 842e6c65026a9b7 588a675ec3
COMMIT: b283dfb3bfc9758