Merge ~cgrabowski/maas:service_boilerplate into ~cgrabowski/maas:rack_region_redesign_spike

Proposed by Christian Grabowski
Status: Merged
Approved by: Christian Grabowski
Approved revision: 347bba340239c69d7ff6ac1da083ea7aea29fb78
Merged at revision: 347bba340239c69d7ff6ac1da083ea7aea29fb78
Proposed branch: ~cgrabowski/maas:service_boilerplate
Merge into: ~cgrabowski/maas:rack_region_redesign_spike
Diff against target: 1060 lines (+985/-1)
8 files modified
src/rackd_spike/go.mod (+4/-1)
src/rackd_spike/go.sum (+5/-0)
src/rackd_spike/internal/service/proc.go (+144/-0)
src/rackd_spike/internal/service/proc_test.go (+67/-0)
src/rackd_spike/internal/service/supervisor.go (+225/-0)
src/rackd_spike/internal/service/supervisor_test.go (+170/-0)
src/rackd_spike/internal/service/supervisord.go (+182/-0)
src/rackd_spike/internal/service/systemd.go (+188/-0)
Reviewer Review Type Date Requested Status
Alexsander de Souza (community) Approve
Christian Grabowski Pending
Review via email: mp+404904@code.launchpad.net

Commit message

add supervisord services

add systemd service

add process services

add service supervisor

bump go version

To post a comment you must log in.
Revision history for this message
Alexsander de Souza (alexsander-souza) wrote :

needs some fixes

review: Needs Fixing
Revision history for this message
Christian Grabowski (cgrabowski) wrote :

A few things I'm gonna change, and a few responses to things

Revision history for this message
Christian Grabowski (cgrabowski) :
Revision history for this message
Alexsander de Souza (alexsander-souza) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/src/rackd_spike/go.mod b/src/rackd_spike/go.mod
2index 2a2e641..99ccd82 100644
3--- a/src/rackd_spike/go.mod
4+++ b/src/rackd_spike/go.mod
5@@ -1,8 +1,11 @@
6 module rackd
7
8-go 1.13
9+go 1.16
10
11 require (
12+ github.com/coreos/go-systemd/v22 v22.3.2
13+ github.com/godbus/dbus/v5 v5.0.4 // indirect
14+ github.com/kolo/xmlrpc v0.0.0-20201022064351-38db28db192b // indirect
15 github.com/prometheus/client_golang v0.9.3
16 github.com/rs/zerolog v1.23.0
17 github.com/spf13/cobra v1.1.3
18diff --git a/src/rackd_spike/go.sum b/src/rackd_spike/go.sum
19index 65479d3..f7a0a9f 100644
20--- a/src/rackd_spike/go.sum
21+++ b/src/rackd_spike/go.sum
22@@ -29,7 +29,9 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
23 github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
24 github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
25 github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
26+github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e h1:Wf6HqHfScWJN9/ZjdUKyjop4mf3Qdd+1TvvltAvM3m8=
27 github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
28+github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI=
29 github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
30 github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
31 github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
32@@ -45,6 +47,7 @@ github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2
33 github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
34 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
35 github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
36+github.com/godbus/dbus/v5 v5.0.4 h1:9349emZab16e7zQvpmsbtjc18ykshndd8y2PG3sgJbA=
37 github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
38 github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
39 github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
40@@ -100,6 +103,8 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV
41 github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
42 github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
43 github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
44+github.com/kolo/xmlrpc v0.0.0-20201022064351-38db28db192b h1:iNjcivnc6lhbvJA3LD622NPrUponluJrBWPIwGG/3Bg=
45+github.com/kolo/xmlrpc v0.0.0-20201022064351-38db28db192b/go.mod h1:pcaDhQK0/NJZEvtCO0qQPPropqV0sJOJ6YW7X+9kRwM=
46 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
47 github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
48 github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
49diff --git a/src/rackd_spike/internal/service/proc.go b/src/rackd_spike/internal/service/proc.go
50new file mode 100644
51index 0000000..ccc959b
52--- /dev/null
53+++ b/src/rackd_spike/internal/service/proc.go
54@@ -0,0 +1,144 @@
55+package service
56+
57+import (
58+ "context"
59+ "fmt"
60+ "os"
61+ "os/exec"
62+ "sync"
63+ "syscall"
64+)
65+
66+type ExecService struct {
67+ sync.RWMutex
68+ name string
69+ t int
70+ pid int
71+ cmd *exec.Cmd
72+ cmdStr string
73+ args []string
74+}
75+
76+func NewExecService(name string, t int, cmd string, args ...string) Service {
77+ return &ExecService{
78+ name: name,
79+ t: t,
80+ pid: -1,
81+ cmdStr: cmd,
82+ args: args,
83+ }
84+}
85+
86+func (e *ExecService) Name() string {
87+ return e.name
88+}
89+
90+func (e *ExecService) Type() int {
91+ return e.t
92+}
93+
94+func (e *ExecService) PID() int {
95+ e.RLock()
96+ defer e.RUnlock()
97+ return e.pid
98+}
99+
100+func (e *ExecService) Start(ctx context.Context) (err error) {
101+ e.Lock()
102+ defer e.Unlock()
103+ if e.pid != -1 && e.cmd != nil && !e.cmd.ProcessState.Exited() {
104+ return ErrServiceAlreadyRunning
105+ }
106+ defer func() {
107+ if err == nil {
108+ e.pid = e.cmd.Process.Pid
109+ }
110+ }()
111+ if e.cmd != nil && e.cmd.ProcessState.Exited() {
112+ err = e.cmd.Process.Release()
113+ if err != nil {
114+ return err
115+ }
116+ }
117+ e.cmd = exec.CommandContext(ctx, e.cmdStr, e.args...)
118+ return e.cmd.Start()
119+}
120+
121+func (e *ExecService) Stop(ctx context.Context) (err error) {
122+ e.Lock()
123+ defer e.Unlock()
124+ if e.pid == -1 || e.cmd == nil || (e.cmd != nil && e.cmd.ProcessState != nil && e.cmd.ProcessState.Exited()) {
125+ return ErrServiceAlreadyStopped
126+ }
127+ defer func() {
128+ if err == nil {
129+ e.pid = -1
130+ e.cmd = nil
131+ }
132+ }()
133+ procExitChan := make(chan error)
134+ go func() {
135+ select {
136+ case <-ctx.Done():
137+ close(procExitChan)
138+ return
139+ case procExitChan <- e.cmd.Wait():
140+ }
141+ }()
142+ err = e.cmd.Process.Signal(syscall.SIGTERM)
143+ if err != nil {
144+ return err
145+ }
146+ select {
147+ case <-ctx.Done():
148+ err = e.cmd.Process.Kill()
149+ if err != nil {
150+ return err
151+ }
152+ case <-procExitChan:
153+ // TODO handle if SIGTERM caused the process to shutdown uncleanly
154+ return nil
155+ }
156+ return ctx.Err()
157+}
158+
159+func (e *ExecService) Restart(ctx context.Context) (err error) {
160+ err = e.Stop(ctx)
161+ if err != nil {
162+ return err
163+ }
164+ return e.Start(ctx)
165+}
166+
167+func (e *ExecService) Status(_ context.Context) error {
168+ e.RLock()
169+ defer e.RUnlock()
170+ if e.cmd.ProcessState != nil && (!e.cmd.ProcessState.Exited() || e.cmd.ProcessState.Success()) {
171+ return nil
172+ }
173+ return fmt.Errorf("%w: service exited: %d", ErrUnexpectedServiceExit, e.cmd.ProcessState.ExitCode())
174+}
175+
176+type ReloadableExecService struct {
177+ ExecService
178+ ReloadSig os.Signal
179+}
180+
181+func NewReloadableExecService(sig os.Signal, name string, t int, cmd string, args ...string) ReloadableService {
182+ return &ReloadableExecService{
183+ ExecService: ExecService{
184+ name: name,
185+ t: t,
186+ pid: -1,
187+ cmdStr: cmd,
188+ args: args,
189+ },
190+ ReloadSig: sig,
191+ }
192+}
193+
194+func (r *ReloadableExecService) Reload(_ context.Context) error {
195+ r.RLock()
196+ defer r.RUnlock()
197+ return r.cmd.Process.Signal(r.ReloadSig)
198+}
199diff --git a/src/rackd_spike/internal/service/proc_test.go b/src/rackd_spike/internal/service/proc_test.go
200new file mode 100644
201index 0000000..520a3cf
202--- /dev/null
203+++ b/src/rackd_spike/internal/service/proc_test.go
204@@ -0,0 +1,67 @@
205+package service
206+
207+import (
208+ "context"
209+ "errors"
210+ "testing"
211+)
212+
213+const testProc = "/usr/bin/yes"
214+
215+func TestExecService(t *testing.T) {
216+ table := []struct {
217+ Name string
218+ In struct {
219+ Name string
220+ Type int
221+ Cmd string
222+ Args []string
223+ }
224+ Out error
225+ CleanupOut error
226+ Action string
227+ Cleanup string
228+ }{{
229+ Name: "start_new_service",
230+ In: struct {
231+ Name string
232+ Type int
233+ Cmd string
234+ Args []string
235+ }{
236+ Name: "mysvc",
237+ Type: SvcDHCP,
238+ Cmd: testProc,
239+ Args: []string{"hello"},
240+ },
241+ Action: "start",
242+ Cleanup: "stop",
243+ }}
244+ for _, tcase := range table {
245+ t.Run(tcase.Name, func(tt *testing.T) {
246+ svc := NewExecService(tcase.In.Name, tcase.In.Type, tcase.In.Cmd, tcase.In.Args...)
247+ ctx := context.Background()
248+ var err error
249+ switch tcase.Action {
250+ case "start":
251+ err = svc.Start(ctx)
252+ case "stop":
253+ err = svc.Stop(ctx)
254+ case "restart":
255+ err = svc.Restart(ctx)
256+ }
257+ if !errors.Is(err, tcase.Out) {
258+ tt.Fatalf("expected %v to equal %v", err, tcase.Out)
259+ }
260+ switch tcase.Cleanup {
261+ case "stop":
262+ err = svc.Stop(ctx)
263+ default:
264+ return
265+ }
266+ if !errors.Is(err, tcase.CleanupOut) {
267+ tt.Fatalf("expectd %v to equal %v", err, tcase.CleanupOut)
268+ }
269+ })
270+ }
271+}
272diff --git a/src/rackd_spike/internal/service/supervisor.go b/src/rackd_spike/internal/service/supervisor.go
273new file mode 100644
274index 0000000..8e5d308
275--- /dev/null
276+++ b/src/rackd_spike/internal/service/supervisor.go
277@@ -0,0 +1,225 @@
278+package service
279+
280+import (
281+ "context"
282+ "errors"
283+ "sync"
284+)
285+
286+const (
287+ SvcUnknown = iota
288+ SvcDHCP
289+ SvcDNS
290+ SvcNTP
291+ SvcPROXY
292+)
293+
294+var (
295+ ErrUnknownService = errors.New("service is not registered with supervisor")
296+ ErrServiceAlreadyRunning = errors.New("service is already in a running state")
297+ ErrServiceAlreadyStopped = errors.New("service is already in a stopped state")
298+ ErrInvalidServiceState = errors.New("service is in an invalid state for this operation")
299+ ErrUnexpectedServiceExit = errors.New("service exited unexpectedly")
300+)
301+
302+type Service interface {
303+ Name() string
304+ Type() int
305+ PID() int
306+ Start(context.Context) error
307+ Stop(context.Context) error
308+ Restart(context.Context) error
309+ Status(context.Context) error
310+}
311+
312+type ReloadableService interface {
313+ Service
314+ Reload(context.Context) error
315+}
316+
317+type SvcManager interface {
318+ RegisterService(Service)
319+ StartAll(context.Context) error
320+ Start(context.Context, string) error
321+ StartByType(context.Context, int) error
322+ StopAll(context.Context) error
323+ Stop(context.Context, string) error
324+ StopByType(context.Context, int) error
325+ Restart(context.Context, string) error
326+ RestartByType(context.Context, int) error
327+ Get(string) (Service, error)
328+ GetType(int) ([]Service, error)
329+ GetPID(int) (Service, error)
330+}
331+
332+type Supervisor struct {
333+ sync.RWMutex
334+ procsByPID map[int]Service
335+ procsByName map[string]Service
336+ procsByType map[int][]Service
337+}
338+
339+func NewSupervisor() *Supervisor {
340+ return &Supervisor{
341+ procsByPID: make(map[int]Service),
342+ procsByName: make(map[string]Service),
343+ procsByType: make(map[int][]Service),
344+ }
345+}
346+
347+func (s *Supervisor) RegisterService(svc Service) {
348+ s.procsByName[svc.Name()] = svc
349+ svcs := s.procsByType[svc.Type()]
350+ s.procsByType[svc.Type()] = append(svcs, svc)
351+ if pid := svc.PID(); pid != -1 {
352+ s.procsByPID[pid] = svc
353+ }
354+}
355+
356+func (s *Supervisor) StartAll(ctx context.Context) error {
357+ for _, svc := range s.procsByName {
358+ err := svc.Start(ctx)
359+ if err != nil {
360+ return err
361+ }
362+ }
363+ return nil
364+}
365+
366+func (s *Supervisor) StopAll(ctx context.Context) error {
367+ for _, svc := range s.procsByPID {
368+ err := svc.Stop(ctx)
369+ if err != nil {
370+ return err
371+ }
372+ }
373+ return nil
374+}
375+
376+func (s *Supervisor) Start(ctx context.Context, name string) (err error) {
377+ svc, err := s.Get(name)
378+ if err != nil {
379+ return err
380+ }
381+ pid := svc.PID()
382+ if _, err = s.GetByPID(pid); err == nil || pid != -1 {
383+ return ErrServiceAlreadyRunning
384+ }
385+ defer func() {
386+ if err == nil {
387+ s.Lock()
388+ defer s.Unlock()
389+ s.procsByPID[svc.PID()] = svc
390+ }
391+ }()
392+ return svc.Start(ctx)
393+}
394+
395+func (s *Supervisor) StartByType(ctx context.Context, t int) error {
396+ svcs, err := s.GetByType(t)
397+ if err != nil {
398+ return err
399+ }
400+ for _, svc := range svcs {
401+ err = s.Start(ctx, svc.Name())
402+ if err != nil {
403+ return err
404+ }
405+ }
406+ return nil
407+}
408+
409+func (s *Supervisor) Stop(ctx context.Context, name string) (err error) {
410+ svc, err := s.Get(name)
411+ if err != nil {
412+ return err
413+ }
414+ pid := svc.PID()
415+ if _, err = s.GetByPID(svc.PID()); err != nil || pid == -1 {
416+ return ErrServiceAlreadyStopped
417+ }
418+ defer func() {
419+ if err == nil {
420+ s.Lock()
421+ defer s.Unlock()
422+ delete(s.procsByPID, pid)
423+ }
424+ }()
425+ return svc.Stop(ctx)
426+}
427+
428+func (s *Supervisor) StopByType(ctx context.Context, t int) error {
429+ svcs, err := s.GetByType(t)
430+ if err != nil {
431+ return err
432+ }
433+ for _, svc := range svcs {
434+ err = s.Stop(ctx, svc.Name())
435+ if err != nil {
436+ return err
437+ }
438+ }
439+ return nil
440+}
441+
442+func (s *Supervisor) Restart(ctx context.Context, name string) (err error) {
443+ svc, err := s.Get(name)
444+ if err != nil {
445+ return err
446+ }
447+ pid := svc.PID()
448+ if _, hasPID := s.procsByPID[pid]; !hasPID || pid == -1 {
449+ return ErrInvalidServiceState
450+ }
451+ defer func() {
452+ if err == nil {
453+ s.Lock()
454+ defer s.Unlock()
455+ // Restart should change PIDs
456+ delete(s.procsByPID, pid)
457+ s.procsByPID[svc.PID()] = svc
458+ }
459+ }()
460+ return svc.Restart(ctx)
461+}
462+
463+func (s *Supervisor) RestartByType(ctx context.Context, t int) error {
464+ svcs, err := s.GetByType(t)
465+ if err != nil {
466+ return err
467+ }
468+ for _, svc := range svcs {
469+ err = s.Restart(ctx, svc.Name())
470+ if err != nil {
471+ return err
472+ }
473+ }
474+ return nil
475+}
476+
477+func (s *Supervisor) Get(name string) (Service, error) {
478+ s.RLock()
479+ defer s.RUnlock()
480+ if svc, ok := s.procsByName[name]; ok {
481+ return svc, nil
482+ }
483+ return nil, ErrUnknownService
484+}
485+
486+func (s *Supervisor) GetByType(t int) ([]Service, error) {
487+ s.RLock()
488+ defer s.RUnlock()
489+ if svcs, ok := s.procsByType[t]; ok {
490+ return svcs, nil
491+ }
492+ return nil, ErrUnknownService
493+}
494+
495+func (s *Supervisor) GetByPID(pid int) (Service, error) {
496+ s.RLock()
497+ defer s.RUnlock()
498+ if svc, ok := s.procsByPID[pid]; ok {
499+ return svc, nil
500+ }
501+ return nil, ErrUnknownService
502+}
503diff --git a/src/rackd_spike/internal/service/supervisor_test.go b/src/rackd_spike/internal/service/supervisor_test.go
504new file mode 100644
505index 0000000..7e65701
506--- /dev/null
507+++ b/src/rackd_spike/internal/service/supervisor_test.go
508@@ -0,0 +1,170 @@
509+package service
510+
511+import (
512+ "context"
513+ "errors"
514+ "testing"
515+)
516+
517+type MockService struct {
518+ Service
519+ pid int
520+ name string
521+ t int
522+ startErr error
523+ stopErr error
524+ restartErr error
525+}
526+
527+func (m MockService) Name() string {
528+ return m.name
529+}
530+
531+func (m MockService) PID() int {
532+ return m.pid
533+}
534+
535+func (m MockService) Type() int {
536+ return m.t
537+}
538+
539+func (m MockService) Start(ctx context.Context) error {
540+ return m.startErr
541+}
542+
543+func (m MockService) Stop(ctx context.Context) error {
544+ return m.stopErr
545+}
546+
547+func (m MockService) Restart(ctx context.Context) error {
548+ return m.restartErr
549+}
550+
551+func TestSupvisor(t *testing.T) {
552+ table := []struct {
553+ Name string
554+ In Service
555+ Out error
556+ Action string
557+ }{{
558+ Name: "start_new_valid_service",
559+ In: MockService{
560+ pid: -1,
561+ name: "mysvc",
562+ t: SvcDHCP,
563+ },
564+ Action: "start",
565+ }, {
566+ Name: "start_all_new_valid_service",
567+ In: MockService{
568+ pid: -1,
569+ name: "mysvc",
570+ t: SvcDHCP,
571+ },
572+ Action: "start_all",
573+ }, {
574+ Name: "start_type_new_valid_service",
575+ In: MockService{
576+ pid: -1,
577+ name: "mysvc",
578+ t: SvcDHCP,
579+ },
580+ Action: "start_type",
581+ }, {
582+ Name: "start_started_service",
583+ In: MockService{
584+ pid: 1,
585+ name: "mysvc",
586+ t: SvcDHCP,
587+ },
588+ Out: ErrServiceAlreadyRunning,
589+ Action: "start",
590+ }, {
591+ Name: "stop_started_service",
592+ In: MockService{
593+ pid: 1,
594+ name: "mysvc",
595+ t: SvcDHCP,
596+ },
597+ Action: "stop",
598+ }, {
599+ Name: "stop_stopped_service",
600+ In: MockService{
601+ pid: -1,
602+ name: "mysvc",
603+ t: SvcDHCP,
604+ },
605+ Out: ErrServiceAlreadyStopped,
606+ Action: "stop",
607+ }, {
608+ Name: "stop_type_started_service",
609+ In: MockService{
610+ pid: 1,
611+ name: "mysvc",
612+ t: SvcDHCP,
613+ },
614+ Action: "stop_type",
615+ }, {
616+ Name: "stop_all_started_service",
617+ In: MockService{
618+ pid: 1,
619+ name: "mysvc",
620+ t: SvcDHCP,
621+ },
622+ Action: "stop_all",
623+ }, {
624+ Name: "restart_started_service",
625+ In: MockService{
626+ pid: 1,
627+ name: "mysvc",
628+ t: SvcDHCP,
629+ },
630+ Action: "restart",
631+ }, {
632+ Name: "restart_stopped_service",
633+ In: MockService{
634+ pid: -1,
635+ name: "mysvc",
636+ t: SvcDHCP,
637+ },
638+ Out: ErrInvalidServiceState,
639+ Action: "restart",
640+ }, {
641+ Name: "restart_by_type_started_service",
642+ In: MockService{
643+ pid: 1,
644+ name: "mysvc",
645+ t: SvcDHCP,
646+ },
647+ Action: "restart_by_type",
648+ }}
649+ for _, tcase := range table {
650+ t.Run(tcase.Name, func(tt *testing.T) {
651+ sup := NewSupervisor()
652+ sup.RegisterService(tcase.In)
653+ ctx := context.Background()
654+ var err error
655+ switch tcase.Action {
656+ case "start_all":
657+ err = sup.StartAll(ctx)
658+ case "start":
659+ err = sup.Start(ctx, tcase.In.Name())
660+ case "start_by_type":
661+ err = sup.StartByType(ctx, tcase.In.Type())
662+ case "stop_all":
663+ err = sup.StopAll(ctx)
664+ case "stop":
665+ err = sup.Stop(ctx, tcase.In.Name())
666+ case "stop_by_type":
667+ err = sup.StopByType(ctx, tcase.In.Type())
668+ case "restart":
669+ err = sup.Restart(ctx, tcase.In.Name())
670+ case "restart_by_type":
671+ err = sup.RestartByType(ctx, tcase.In.Type())
672+ }
673+ if !errors.Is(err, tcase.Out) {
674+ tt.Fatalf("expected %v to equal %v", err, tcase.Out)
675+ }
676+ })
677+ }
678+}
679diff --git a/src/rackd_spike/internal/service/supervisord.go b/src/rackd_spike/internal/service/supervisord.go
680new file mode 100644
681index 0000000..71e0e31
682--- /dev/null
683+++ b/src/rackd_spike/internal/service/supervisord.go
684@@ -0,0 +1,182 @@
685+package service
686+
687+import (
688+ "context"
689+ "errors"
690+ "fmt"
691+ "net/rpc"
692+ "strconv"
693+ "sync"
694+
695+ "github.com/kolo/xmlrpc"
696+)
697+
698+var (
699+ xmlrpcConnOnce = &sync.Once{}
700+ xmlrpcCloseOnce = &sync.Once{}
701+)
702+
703+var (
704+ supConn *xmlrpc.Client
705+)
706+
707+var (
708+ ErrSupervisordPIDNotFound = errors.New("error supervisord service pid not found")
709+ ErrSupervisordUnsupportedType = errors.New("received an unsupported type from supervisord")
710+)
711+
712+func getSupervisordConn(endpoint string) (*xmlrpc.Client, error) {
713+ var err error
714+ xmlrpcConnOnce.Do(func() {
715+ supConn, err = xmlrpc.NewClient(endpoint, nil)
716+ })
717+ return supConn, err
718+}
719+
720+func CloseSupervisordConn() error {
721+ var err error
722+ xmlrpcCloseOnce.Do(func() {
723+ err = supConn.Close()
724+ })
725+ return err
726+}
727+
728+type SupervisordService struct {
729+ sync.RWMutex
730+ conn *xmlrpc.Client
731+ name string
732+ pid int
733+ t int
734+}
735+
736+func NewSupervisordService(endpoint, name string, t int) (Service, error) {
737+ conn, err := getSupervisordConn(endpoint)
738+ if err != nil {
739+ return nil, err
740+ }
741+ return &SupervisordService{
742+ conn: conn,
743+ name: name,
744+ t: t,
745+ }, nil
746+}
747+
748+func (s *SupervisordService) Name() string {
749+ return s.name
750+}
751+
752+func (s *SupervisordService) Type() int {
753+ return s.t
754+}
755+
756+func (s *SupervisordService) PID() int {
757+ s.RLock()
758+ defer s.RUnlock()
759+ return s.pid
760+}
761+
762+func (s *SupervisordService) readPIDFromResult(res map[string]interface{}) error {
763+ if pidField, ok := res["pid"]; ok {
764+ s.Lock()
765+ defer s.Unlock()
766+ var err error
767+ switch pid := pidField.(type) {
768+ case int:
769+ s.pid = pid
770+ case int32:
771+ s.pid = int(pid)
772+ case int64:
773+ s.pid = int(pid)
774+ case float32:
775+ s.pid = int(pid)
776+ case float64:
777+ s.pid = int(pid)
778+ case string:
779+ s.pid, err = strconv.Atoi(pid)
780+ if err != nil {
781+ s.pid = -1
782+ return err
783+ }
784+ default:
785+ return fmt.Errorf("%w: pid:%v", ErrSupervisordUnsupportedType, pid)
786+ }
787+ return nil
788+ }
789+ return ErrSupervisordPIDNotFound
790+}
791+
792+func (s *SupervisordService) Start(ctx context.Context) error {
793+ resChan := make(chan *rpc.Call)
794+ s.conn.Go("startProcess", []interface{}{s.name}, nil, resChan)
795+ select {
796+ case <-ctx.Done():
797+ return ctx.Err()
798+ case call := <-resChan:
799+ if call.Error != nil {
800+ return call.Error
801+ }
802+ res, err := s.getProcInfo(ctx)
803+ if err != nil {
804+ return err
805+ }
806+ err = s.readPIDFromResult(res)
807+ if err != nil {
808+ return err
809+ }
810+ }
811+ return nil
812+}
813+
814+func (s *SupervisordService) Stop(ctx context.Context) error {
815+ resChan := make(chan *rpc.Call)
816+ s.conn.Go("stopProcess", []interface{}{s.name}, nil, resChan)
817+ select {
818+ case <-ctx.Done():
819+ return ctx.Err()
820+ case call := <-resChan:
821+ if call.Error != nil {
822+ return call.Error
823+ }
824+ s.Lock()
825+ defer s.Unlock()
826+ s.pid = -1
827+ }
828+ return nil
829+}
830+
831+func (s *SupervisordService) Restart(ctx context.Context) error {
832+ err := s.Stop(ctx)
833+ if err != nil {
834+ return err
835+ }
836+ return s.Start(ctx)
837+}
838+
839+func (s *SupervisordService) getProcInfo(ctx context.Context) (map[string]interface{}, error) {
840+ resChan := make(chan *rpc.Call)
841+ s.conn.Go("getProcessInfo", []interface{}{s.name}, nil, resChan)
842+ select {
843+ case <-ctx.Done():
844+ return nil, ctx.Err()
845+ case call := <-resChan:
846+ if call.Error != nil {
847+ return nil, call.Error
848+ }
849+ res, ok := call.Reply.(map[string]interface{})
850+ if !ok {
851+ return nil, ErrSupervisordUnsupportedType
852+ }
853+ return res, nil
854+ }
855+}
856+
857+func (s *SupervisordService) Status(ctx context.Context) error {
858+ info, err := s.getProcInfo(ctx)
859+ if err != nil {
860+ return err
861+ }
862+ if info["stop"] != 0 {
863+ return fmt.Errorf("%w: service %s exited", ErrUnexpectedServiceExit, s.name)
864+ }
865+ return nil
866+}
867diff --git a/src/rackd_spike/internal/service/systemd.go b/src/rackd_spike/internal/service/systemd.go
868new file mode 100644
869index 0000000..dbe0117
870--- /dev/null
871+++ b/src/rackd_spike/internal/service/systemd.go
872@@ -0,0 +1,188 @@
873+package service
874+
875+import (
876+ "context"
877+ "errors"
878+ "fmt"
879+ "os"
880+ "strconv"
881+ "sync"
882+
883+ "github.com/coreos/go-systemd/v22/dbus"
884+)
885+
886+const (
887+ startFailMode = "fail"
888+ // TODO add more supported start modes
889+)
890+
891+const (
892+ resultDone = "done"
893+ resultCanceled = "canceled"
894+ resultTimeout = "timeout"
895+ resultFailed = "failed"
896+ resultIsolate = "isolate"
897+ resultIgnoreDeps = "ignore-dependencies"
898+ resultIgnoreReqs = "ignore-requirements"
899+)
900+
901+var (
902+ systemdConnOnce = &sync.Once{}
903+ systemdCloseOnce = &sync.Once{}
904+)
905+
906+var (
907+ ErrSystemdStart = errors.New("error starting systemd unit")
908+ ErrSystemdStop = errors.New("error stopping systemd unit")
909+ ErrSystemdPIDNotFound = errors.New("error looking up unit's pid in systemd")
910+ ErrSystemdInvalidPropType = errors.New("error systemd property not of supported type")
911+ ErrSystemdServiceNotFound = errors.New("error systemd service not found")
912+ ErrSystemdBadServiceState = errors.New("error systemd service is not active")
913+)
914+
915+var (
916+ dbusConn *dbus.Conn
917+)
918+
919+func getDBusConn(ctx context.Context) (*dbus.Conn, error) {
920+ var err error
921+ systemdConnOnce.Do(func() {
922+ if os.Getuid() == 0 {
923+ dbusConn, err = dbus.NewSystemConnectionContext(ctx)
924+ if err != nil {
925+ return
926+ }
927+ } else {
928+ dbusConn, err = dbus.NewUserConnectionContext(ctx)
929+ if err != nil {
930+ return
931+ }
932+ }
933+ })
934+ return dbusConn, err
935+}
936+
937+func CloseSystemdConn() {
938+ systemdCloseOnce.Do(func() {
939+ dbusConn.Close()
940+ })
941+}
942+
943+type SystemdService struct {
944+ sync.RWMutex
945+ conn *dbus.Conn
946+ name string
947+ t int
948+ pid int
949+ UnitFile string
950+}
951+
952+func NewSystemdService(ctx context.Context, name string, t int, unitFile string) (ReloadableService, error) {
953+ conn, err := getDBusConn(ctx)
954+ if err != nil {
955+ return nil, err
956+ }
957+ return &SystemdService{
958+ conn: conn,
959+ name: name,
960+ t: t,
961+ UnitFile: unitFile,
962+ }, nil
963+}
964+
965+func (s *SystemdService) Name() string {
966+ return s.name
967+}
968+
969+func (s *SystemdService) Type() int {
970+ return s.t
971+}
972+
973+func (s *SystemdService) PID() int {
974+ s.RLock()
975+ defer s.RUnlock()
976+ return s.pid
977+}
978+
979+func (s *SystemdService) readPIDFromProps(ctx context.Context) error {
980+ s.Lock()
981+ defer s.Unlock()
982+ props, err := s.conn.GetAllPropertiesContext(ctx, s.UnitFile)
983+ if err != nil {
984+ return err
985+ }
986+ if p, ok := props["ExecMainPID"]; ok {
987+ switch pid := p.(type) {
988+ case int:
989+ s.pid = pid
990+ case int32:
991+ s.pid = int(pid)
992+ case int64:
993+ s.pid = int(pid)
994+ case float32:
995+ s.pid = int(pid)
996+ case float64:
997+ s.pid = int(pid)
998+ case string:
999+ s.pid, err = strconv.Atoi(pid)
1000+ if err != nil {
1001+ s.pid = -1
1002+ return err
1003+ }
1004+ default:
1005+ return fmt.Errorf("%w: ExecMainPID", ErrSystemdInvalidPropType)
1006+ }
1007+ return nil
1008+ }
1009+ return ErrSystemdPIDNotFound
1010+}
1011+
1012+func (s *SystemdService) Start(ctx context.Context) error {
1013+ // "fail" mode will error if process is already running or queued to run
1014+ _, err := s.conn.StartUnitContext(ctx, s.UnitFile, startFailMode, nil)
1015+ if err != nil {
1016+ return err
1017+ }
1018+ return s.readPIDFromProps(ctx)
1019+}
1020+
1021+func (s *SystemdService) Stop(ctx context.Context) error {
1022+ _, err := s.conn.StopUnitContext(ctx, s.UnitFile, startFailMode, nil)
1023+ if err != nil {
1024+ return err
1025+ }
1026+ s.Lock()
1027+ defer s.Unlock()
1028+ s.pid = -1
1029+ return nil
1030+}
1031+
1032+func (s *SystemdService) Restart(ctx context.Context) error {
1033+ _, err := s.conn.RestartUnitContext(ctx, s.UnitFile, startFailMode, nil)
1034+ if err != nil {
1035+ return err
1036+ }
1037+ return s.readPIDFromProps(ctx)
1038+}
1039+
1040+func (s *SystemdService) Status(ctx context.Context) error {
1041+ status, err := s.conn.ListUnitsByNamesContext(ctx, []string{s.UnitFile})
1042+ if err != nil {
1043+ return err
1044+ }
1045+ if len(status) == 0 {
1046+ return fmt.Errorf("%w: %s", ErrSystemdServiceNotFound, s.UnitFile)
1047+ }
1048+ if status[0].ActiveState == "active" || status[0].ActiveState == "running" {
1049+ return nil
1050+ }
1051+ return ErrSystemdBadServiceState
1052+}
1053+
1054+func (s *SystemdService) Reload(ctx context.Context) error {
1055+ _, err := s.conn.RestartUnitContext(ctx, s.UnitFile, startFailMode, nil)
1056+ if err != nil {
1057+ return err
1058+ }
1059+ return s.readPIDFromProps(ctx)
1060+}

Subscribers

People subscribed via source and target branches

to all changes: