Merge ~cgrabowski/maas:rack_spike_http into maas:rack_region_exploration

Proposed by Christian Grabowski
Status: Merged
Approved by: Christian Grabowski
Approved revision: dd8d76c96c704e489dadb54592f89f75baf83c3c
Merge reported by: MAAS Lander
Merged at revision: not available
Proposed branch: ~cgrabowski/maas:rack_spike_http
Merge into: maas:rack_region_exploration
Diff against target: 980 lines (+828/-10)
9 files modified
src/maasserver/rpc/capnp/region.py (+26/-4)
src/rackd_spike/cmd/rackd.go (+24/-1)
src/rackd_spike/go-lint.sh (+3/-2)
src/rackd_spike/internal/http/proxy.go (+381/-0)
src/rackd_spike/internal/http/reverse.go (+234/-0)
src/rackd_spike/internal/service/supervisor.go (+2/-0)
src/rackd_spike/internal/transport/rpc.go (+11/-0)
src/rackd_spike/pkg/http/proxy.go (+110/-0)
src/rackd_spike/pkg/region/config.go (+37/-3)
Reviewer Review Type Date Requested Status
MAAS Lander Approve
Alexsander de Souza Approve
Review via email: mp+406257@code.launchpad.net

Commit message

fix build

fix authentication

fix existing services

close connection on error

start supervisor earlier

add get proxy call

configure http services

add reverse proxy

make https urls work

quiet down logs

successfully proxy http

fix existing services

forward requests

add http proxy

To post a comment you must log in.
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b rack_spike_http lp:~cgrabowski/maas/+git/maas into -b rack_region_exploration lp:~maas-committers/maas

STATUS: FAILED
LOG: http://maas-ci.internal:8080/job/maas/job/branch-tester/10593/console
COMMIT: c557636756d200ec77051f36feaca06e097c877e

review: Needs Fixing
~cgrabowski/maas:rack_spike_http updated
54e4f18... by Christian Grabowski

fix linter issues

Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b rack_spike_http lp:~cgrabowski/maas/+git/maas into -b rack_region_exploration lp:~maas-committers/maas

STATUS: SUCCESS
COMMIT: 61116646d017559136b515dc7348826891984a56

review: Approve
~cgrabowski/maas:rack_spike_http updated
30c08a9... by Christian Grabowski

use events set in other call

Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b rack_spike_http lp:~cgrabowski/maas/+git/maas into -b rack_region_exploration lp:~maas-committers/maas

STATUS: SUCCESS
COMMIT: 3c09c61450b62d1ec3d3d57a3d069dd72d5add0b

review: Approve
~cgrabowski/maas:rack_spike_http updated
581634f... by Christian Grabowski

initialize allowed_cidrs list first

Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b rack_spike_http lp:~cgrabowski/maas/+git/maas into -b rack_region_exploration lp:~maas-committers/maas

STATUS: SUCCESS
COMMIT: 581634f038b336abefa7f9f862a311a106a4a573

review: Approve
Revision history for this message
Alexsander de Souza (alexsander-souza) wrote :

I think there's a typo, just fix it before landing this

review: Approve
~cgrabowski/maas:rack_spike_http updated
dd8d76c... by Christian Grabowski

fix typo

Revision history for this message
Christian Grabowski (cgrabowski) :
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b rack_spike_http lp:~cgrabowski/maas/+git/maas into -b rack_region_exploration lp:~maas-committers/maas

STATUS: SUCCESS
COMMIT: dd8d76c96c704e489dadb54592f89f75baf83c3c

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/src/maasserver/rpc/capnp/region.py b/src/maasserver/rpc/capnp/region.py
2index ae8cb64..cbc5f05 100644
3--- a/src/maasserver/rpc/capnp/region.py
4+++ b/src/maasserver/rpc/capnp/region.py
5@@ -105,6 +105,7 @@ class RegionController(region.RegionController.Server):
6 self.shim = RegionServer()
7 self.rack_controllers = rack_controllers
8 self.server = server
9+ self.events = set()
10 super(RegionController, self).__init__()
11
12 def ping(self):
13@@ -183,7 +184,8 @@ class RegionController(region.RegionController.Server):
14 return None
15
16 def getTimeConfiguration_context(self, context):
17- self.event = capnp.PromiseFulfillerPair()
18+ event = capnp.PromiseFulfillerPair()
19+ self.events.add(event)
20 systemId = context.params.systemId
21
22 def get_result(cfg):
23@@ -200,7 +202,8 @@ class RegionController(region.RegionController.Server):
24 lst[i] = s
25
26 context.results.resp = resp
27- self.event.fulfill()
28+ event.fulfill()
29+ self.events.remove(event)
30
31 self.shim.get_time_configuration(systemId).addCallback(get_result)
32 return self.event.promise
33@@ -216,8 +219,27 @@ class RegionController(region.RegionController.Server):
34 self.shim.get_dns_configuration(systemId).addCallback(get_result)
35 return prom
36
37- def getProxyConfiguration(self, systemId):
38- return None
39+ def getProxyConfiguration_context(self, context):
40+ system_id = context.params.systemId
41+ event = capnp.PromiseFulfillerPair()
42+ self.events.add(event)
43+
44+ def get_results(cfg):
45+ resp = controller.ProxyConfiguration()
46+ resp.enabled = cfg.get("enabled", True)
47+ resp.port = cfg.get("port")
48+ allowed_cidrs = cfg.get("allowed_cidrs", [])
49+ lst = resp.init("allowed_cidrs", len(allowed_cidrs))
50+ for i, cidr in enumerate(allowed_cidrs):
51+ lst[i] = cidr
52+ resp.allowedCidrs = lst
53+ resp.preferV4Proxy = cfg.get("prefer_v4_proxy", False)
54+ context.results.proxyConfig = resp
55+ event.fulfill()
56+ self.events.remove(event)
57+
58+ self.shim.get_proxy_configuration(system_id).addCallback(get_results)
59+ return event.promise
60
61 def getSyslogConfiguration(self, systemId):
62 return None
63diff --git a/src/rackd_spike/cmd/rackd.go b/src/rackd_spike/cmd/rackd.go
64index 5091571..1dd491c 100644
65--- a/src/rackd_spike/cmd/rackd.go
66+++ b/src/rackd_spike/cmd/rackd.go
67@@ -15,6 +15,7 @@ import (
68 "rackd/cmd/subcommands"
69 "rackd/internal/config"
70 "rackd/internal/dhcp"
71+ "rackd/internal/http"
72 machinehelpers "rackd/internal/machine_helpers"
73 "rackd/internal/metrics"
74 "rackd/internal/ntp"
75@@ -22,6 +23,7 @@ import (
76 "rackd/internal/transport"
77 "rackd/pkg/authenticate"
78 "rackd/pkg/controller"
79+ httprpc "rackd/pkg/http"
80 ntprpc "rackd/pkg/ntp"
81 "rackd/pkg/region"
82 "rackd/pkg/register"
83@@ -77,8 +79,22 @@ func registerProxyServices(ctx context.Context, sup service.SvcManager) error {
84 if err != nil {
85 return err
86 }
87+ httpProxy, err := http.NewProxy(ctx, "0.0.0.0", 80)
88+ if err != nil {
89+ return err
90+ }
91+ machineResourcesPath, err := machinehelpers.GetResourcesBinPath()
92+ if err != nil {
93+ return err
94+ }
95+ reverseProxy, err := http.NewReverseProxy(ctx, machineResourcesPath, "0.0.0.0", 5248)
96+ if err != nil {
97+ return err
98+ }
99 sup.RegisterService(dhcp.NewRelaySvc())
100 sup.RegisterService(ntpProxy)
101+ sup.RegisterService(httpProxy)
102+ sup.RegisterService(reverseProxy)
103 return nil
104 }
105
106@@ -190,6 +206,13 @@ func runRoot(cmd *cobra.Command, args []string) error {
107 return err
108 }
109 rpcMgr.AddClient(ctx, ntpClient)
110+
111+ proxyClient, err := httprpc.New(sup)
112+ if err != nil {
113+ return err
114+ }
115+ rpcMgr.AddClient(ctx, proxyClient)
116+
117 rackController, err := controller.NewRackController(ctx, true, initRegion, sup)
118 if err != nil {
119 return err
120@@ -201,7 +224,7 @@ func runRoot(cmd *cobra.Command, args []string) error {
121 if err != nil {
122 return err
123 }
124- err = region.GetRemoteConfig(ctx, rpcMgr)
125+ err = region.GetRemoteConfig(ctx, rpcMgr, sup)
126 if err != nil {
127 return err
128 }
129diff --git a/src/rackd_spike/go-lint.sh b/src/rackd_spike/go-lint.sh
130index 6506442..c0e86ae 100755
131--- a/src/rackd_spike/go-lint.sh
132+++ b/src/rackd_spike/go-lint.sh
133@@ -5,7 +5,8 @@ go vet ./... 2>&1 | grep -v \
134 -e 'internal/dhcp/service.go:46' \
135 -e 'internal/dhcp/service.go:74' \
136 -e 'internal/dhcp/service.go:102' \
137- -e 'internal/dhcp/service.go:130'
138+ -e 'internal/dhcp/service.go:130' \
139+ -e 'internal/http/reverse.go:57'
140
141 # invert grep result
142-[ "$?" -ne "0" ]
143\ No newline at end of file
144+[ "$?" -ne "0" ]
145diff --git a/src/rackd_spike/internal/http/proxy.go b/src/rackd_spike/internal/http/proxy.go
146new file mode 100644
147index 0000000..69ff494
148--- /dev/null
149+++ b/src/rackd_spike/internal/http/proxy.go
150@@ -0,0 +1,381 @@
151+package http
152+
153+import (
154+ "context"
155+ "crypto/tls"
156+ "errors"
157+ "fmt"
158+ "io"
159+ stdlibLog "log"
160+ "net"
161+ "net/http"
162+ "os"
163+ "strconv"
164+ "strings"
165+ "sync"
166+ "syscall"
167+ "time"
168+
169+ "github.com/rs/zerolog"
170+
171+ "rackd/internal/service"
172+)
173+
174+var (
175+ ErrCircularRequest = errors.New("error circular request")
176+ ErrNotInternalAddress = errors.New("received external request on internal endpoint")
177+ ErrClientNotAllowed = errors.New("client not found in allowed CIDRs")
178+)
179+
180+type ProxyService interface {
181+ service.Service
182+ Configure(ctx context.Context, enabled, preferV4 bool, port int16, allowedCidrs []*net.IPNet) error
183+}
184+
185+type Proxy struct {
186+ sync.Mutex
187+ srvr *http.Server
188+ client *http.Client
189+ opts *ProxyOptions
190+ selfAddr *net.TCPAddr
191+ enabled bool
192+ allowedCidrs []*net.IPNet
193+ getCtx func() context.Context
194+}
195+
196+func NewProxy(ctx context.Context, bindAddr string, port int) (*Proxy, error) {
197+ return NewProxyWithTLS(ctx, bindAddr, port, nil)
198+}
199+
200+func NewProxyWithTLS(ctx context.Context, bindAddr string, port int, tlsCfg *tls.Config) (*Proxy, error) {
201+ addr := net.JoinHostPort(bindAddr, strconv.Itoa(port))
202+ return NewProxyWithOptions(ctx, ProxyOptions{
203+ FrontendAddr: addr,
204+ FrontendTLSCfg: tlsCfg,
205+ })
206+}
207+
208+type ProxyOptions struct {
209+ MaxHeaderSize int
210+ HTTP2Enabled bool
211+ FrontendTLSCfg *tls.Config
212+ ConnectTimeout time.Duration
213+ ReadTimeout time.Duration
214+ HdrRdTimeout time.Duration
215+ WriteTimeout time.Duration
216+ IdleTimeout time.Duration
217+ UpstreamTLSTimeout time.Duration
218+ UpstreamMaxIdleConns int
219+ UpstreamMaxConnsPerHost int
220+ WriteBufSize int
221+ ReadBufSize int
222+ FrontendAddr string
223+ BackendAddr string
224+ IgnoreUpstreamHostVerify bool
225+ NoKeepAlive bool
226+ NoCompression bool
227+}
228+
229+func NewProxyWithOptions(ctx context.Context, opts ProxyOptions) (*Proxy, error) {
230+ logger := zerolog.Ctx(ctx)
231+ errorLogger := stdlibLog.New(logger, "http-proxy", stdlibLog.LstdFlags|stdlibLog.LUTC) // wrap the existing logger in the stdlib log for http.Server
232+ var clientTLSCfg *tls.Config
233+ if opts.IgnoreUpstreamHostVerify {
234+ clientTLSCfg = &tls.Config{
235+ InsecureSkipVerify: opts.IgnoreUpstreamHostVerify,
236+ }
237+ }
238+ if len(opts.BackendAddr) == 0 {
239+ opts.BackendAddr = "0.0.0.0"
240+ }
241+ backendAddr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(opts.BackendAddr, "0"))
242+ if err != nil {
243+ return nil, err
244+ }
245+ frontendAddr, err := net.ResolveTCPAddr("tcp", opts.FrontendAddr)
246+ if err != nil {
247+ return nil, err
248+ }
249+ httpDialer := &net.Dialer{
250+ Timeout: opts.ConnectTimeout,
251+ LocalAddr: backendAddr,
252+ Cancel: ctx.Done(),
253+ }
254+ if opts.NoKeepAlive {
255+ httpDialer.KeepAlive = -1
256+ }
257+ httpsDialer := &tls.Dialer{
258+ NetDialer: httpDialer,
259+ Config: clientTLSCfg,
260+ }
261+ if opts.HTTP2Enabled {
262+ opts.FrontendTLSCfg.NextProtos = []string{"h2"}
263+ }
264+ p := &Proxy{
265+ srvr: &http.Server{
266+ Addr: opts.FrontendAddr,
267+ TLSConfig: opts.FrontendTLSCfg,
268+ ReadTimeout: opts.ReadTimeout,
269+ ReadHeaderTimeout: opts.HdrRdTimeout,
270+ WriteTimeout: opts.WriteTimeout,
271+ IdleTimeout: opts.IdleTimeout,
272+ ErrorLog: errorLogger,
273+ BaseContext: func(_ net.Listener) context.Context {
274+ return ctx
275+ },
276+ },
277+ client: &http.Client{
278+ Transport: &http.Transport{
279+ Proxy: nil,
280+ DialContext: httpDialer.DialContext,
281+ DialTLSContext: httpsDialer.DialContext,
282+ TLSClientConfig: clientTLSCfg,
283+ TLSHandshakeTimeout: opts.UpstreamTLSTimeout,
284+ DisableKeepAlives: opts.NoKeepAlive,
285+ DisableCompression: opts.NoCompression,
286+ MaxIdleConns: opts.UpstreamMaxIdleConns,
287+ MaxConnsPerHost: opts.UpstreamMaxConnsPerHost,
288+ IdleConnTimeout: opts.IdleTimeout,
289+ ResponseHeaderTimeout: opts.HdrRdTimeout,
290+ MaxResponseHeaderBytes: int64(opts.MaxHeaderSize),
291+ WriteBufferSize: opts.WriteBufSize,
292+ ReadBufferSize: opts.ReadBufSize,
293+ ForceAttemptHTTP2: opts.HTTP2Enabled,
294+ },
295+ },
296+ opts: &opts,
297+ selfAddr: frontendAddr,
298+ }
299+ p.srvr.Handler = p
300+ return p, nil
301+}
302+
303+func (p *Proxy) Name() string {
304+ return "http_proxy"
305+}
306+
307+func (p *Proxy) Type() int {
308+ return service.SvcPROXY
309+}
310+
311+func (p *Proxy) PID() int {
312+ return os.Getpid()
313+}
314+
315+func (p *Proxy) Start(ctx context.Context) (err error) {
316+ if !p.enabled {
317+ return nil
318+ }
319+ var listener net.Listener
320+ if p.opts.FrontendTLSCfg != nil {
321+ listener, err = tls.Listen("tcp", p.opts.FrontendAddr, p.opts.FrontendTLSCfg)
322+ } else {
323+ listener, err = net.Listen("tcp", p.opts.FrontendAddr)
324+ }
325+ if err != nil {
326+ return err
327+ }
328+ p.getCtx = func() context.Context {
329+ return ctx
330+ }
331+ go p.srvr.Serve(listener)
332+ return nil
333+}
334+
335+func (p *Proxy) Stop(ctx context.Context) error {
336+ return p.srvr.Shutdown(ctx)
337+}
338+
339+func (p *Proxy) checkForCircularReq(r *http.Request) (err error) {
340+ port := r.URL.Port()
341+ if len(port) == 0 {
342+ if r.URL.Scheme == "https" {
343+ port = "443"
344+ } else {
345+ port = "80"
346+ }
347+ }
348+ var ips []net.IP
349+ ip := net.ParseIP(r.URL.Hostname())
350+ if ip == nil {
351+ ips, err = net.DefaultResolver.LookupIP(p.getCtx(), "ip", r.URL.Hostname())
352+ if err != nil {
353+ return err
354+ }
355+ } else {
356+ ips = []net.IP{ip}
357+ }
358+ for _, iaddr := range ips {
359+ remoteAddr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(iaddr.String(), port))
360+ if err != nil {
361+ return err
362+ }
363+ if remoteAddr.String() == p.selfAddr.String() {
364+ return ErrCircularRequest
365+ }
366+ }
367+ return nil
368+}
369+
370+func (p *Proxy) setForwardingHeaders(h http.Header, req *http.Request) {
371+ h.Set("X-Forwarded-For", req.RemoteAddr)
372+ h.Set("X-Forwarded-Host", req.Host)
373+ h.Set("Host", req.Host)
374+ h.Set("Forwarded", fmt.Sprintf("by=%s", p.selfAddr.String()))
375+ h.Add("Forwarded", fmt.Sprintf("for=%s", req.RemoteAddr))
376+ h.Add("Forwarded", fmt.Sprintf("host=%s", req.Host))
377+ h.Add("Forwarded", fmt.Sprintf("proto=%s", req.URL.Scheme))
378+ h.Set("Origin", fmt.Sprintf("%s://%s", req.URL.Scheme, req.URL.Host))
379+}
380+
381+func (p *Proxy) handleNetError(ctx context.Context, req *http.Request, w http.ResponseWriter, err error) {
382+ if err == net.ErrClosed || err == syscall.ECONNREFUSED {
383+ w.WriteHeader(http.StatusServiceUnavailable)
384+ return
385+ }
386+ if err == syscall.ECONNRESET || err == syscall.EHOSTUNREACH || err == syscall.EIO || err == syscall.EPIPE {
387+ w.WriteHeader(http.StatusBadGateway)
388+ return
389+ }
390+ if timeout, ok := err.(net.Error); (ok && timeout.Timeout()) || err == syscall.ETIMEDOUT {
391+ w.WriteHeader(http.StatusGatewayTimeout)
392+ return
393+ }
394+ w.WriteHeader(http.StatusInternalServerError)
395+}
396+
397+func (p *Proxy) handleResponseError(ctx context.Context, req *http.Request, resp *http.Response, w http.ResponseWriter, err error) {
398+ logger := zerolog.Ctx(ctx)
399+ if resp == nil {
400+ logger.Err(err).Msgf("no response received")
401+ p.handleNetError(ctx, req, w, err)
402+ return
403+ }
404+ logger.Err(err).Msgf("forward error to: %s, status %d", req.URL.RequestURI(), resp.StatusCode)
405+ switch resp.StatusCode {
406+ case 500, 400, 401, 402, 403, 404, 406, 409, 410, 411, 412, 413, 414, 415, 416, 417, 422, 423, 424, 425, 428, 431, 451, 501, 507, 510, 511:
407+ w.WriteHeader(resp.StatusCode)
408+ return
409+ case 502, 503, 504, 505, 506:
410+ w.WriteHeader(http.StatusBadGateway)
411+ return
412+ default:
413+ p.handleNetError(ctx, req, w, err)
414+ }
415+}
416+
417+func (p *Proxy) PassResponseHeaders(w http.ResponseWriter, resp *http.Response) {
418+ for k, v := range resp.Header {
419+ w.Header().Set(k, v[0])
420+ if len(v) > 1 {
421+ for _, val := range v[1:] {
422+ w.Header().Add(k, val)
423+ }
424+ }
425+ }
426+}
427+
428+func (p *Proxy) checkIsInternal(r *http.Request) error {
429+ hostPortSlice := strings.Split(r.RemoteAddr, ":")
430+ remoteHost, _ := hostPortSlice[0], hostPortSlice[1]
431+ for _, internalHost := range []string{"localhost", "127.0.0.1", "::1"} {
432+ if remoteHost == internalHost {
433+ return nil
434+ }
435+ }
436+ return ErrNotInternalAddress
437+}
438+
439+func (p *Proxy) checkIPAllowed(remoteAddr string) error {
440+ remoteHost, _, err := net.SplitHostPort(remoteAddr)
441+ if err != nil {
442+ return err
443+ }
444+ remoteIP := net.ParseIP(remoteHost)
445+ _, localNet, err := net.ParseCIDR("127.0.0.0/8")
446+ if err != nil {
447+ return err
448+ }
449+ for _, allowed := range append(p.allowedCidrs, localNet) {
450+ if !allowed.Contains(remoteIP) {
451+ return ErrClientNotAllowed
452+ }
453+ }
454+ return nil
455+}
456+
457+func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
458+ logger := zerolog.Ctx(p.getCtx())
459+ defer r.Body.Close()
460+ err := p.checkIPAllowed(r.RemoteAddr)
461+ if err != nil {
462+ w.WriteHeader(http.StatusForbidden)
463+ return
464+ }
465+ fwdURL := r.URL.String()
466+ if len(r.URL.Scheme) == 0 {
467+ if r.URL.Port() == "443" {
468+ fwdURL = "https:" + fwdURL
469+ } else {
470+ fwdURL = "http:" + fwdURL
471+ }
472+ }
473+ logger.Debug().Msgf("%s requesting %s %s", r.RemoteAddr, r.Method, fwdURL)
474+ err = p.checkForCircularReq(r)
475+ if err != nil {
476+ w.WriteHeader(http.StatusLoopDetected)
477+ return
478+ }
479+ fwdReq, err := http.NewRequestWithContext(p.getCtx(), r.Method, fwdURL, r.Body)
480+ if err != nil {
481+ w.WriteHeader(http.StatusInternalServerError)
482+ return
483+ }
484+ fwdReq.Header = r.Header.Clone()
485+ p.setForwardingHeaders(fwdReq.Header, r)
486+ resp, err := p.client.Do(fwdReq)
487+ if err != nil {
488+ p.handleResponseError(p.getCtx(), r, resp, w, err)
489+ return
490+ }
491+ defer logger.Debug().Msgf("%s responding to %s, %s", fwdURL, r.RemoteAddr, resp.Status)
492+ defer resp.Body.Close()
493+ if resp.StatusCode >= 400 {
494+ p.handleResponseError(p.getCtx(), r, resp, w, nil)
495+ return
496+ }
497+ p.PassResponseHeaders(w, resp)
498+ w.WriteHeader(resp.StatusCode)
499+ _, err = io.Copy(w, resp.Body)
500+ if err != nil {
501+ logger := zerolog.Ctx(p.getCtx())
502+ logger.Err(err).Msgf("writing body to %s", r.RemoteAddr)
503+ }
504+}
505+
506+func (p *Proxy) Restart(ctx context.Context) error {
507+ err := p.Stop(ctx)
508+ if err != nil {
509+ return err
510+ }
511+ return p.Start(ctx)
512+}
513+
514+func (p *Proxy) Status(_ context.Context) error {
515+ return nil
516+}
517+
518+func (p *Proxy) Configure(ctx context.Context, enabled, preferV4 bool, port int16, allowedCidrs []*net.IPNet) (err error) {
519+ p.Lock()
520+ defer p.Unlock()
521+ p.enabled = enabled
522+ // TODO resolve DNS for IPv4 first if preferV4
523+
524+ p.opts.FrontendAddr = net.JoinHostPort(p.selfAddr.IP.String(), strconv.Itoa(int(port)))
525+ p.selfAddr, err = net.ResolveTCPAddr("tcp", p.opts.FrontendAddr)
526+ if err != nil {
527+ return err
528+ }
529+ p.allowedCidrs = allowedCidrs
530+ return p.Restart(ctx)
531+}
532diff --git a/src/rackd_spike/internal/http/reverse.go b/src/rackd_spike/internal/http/reverse.go
533new file mode 100644
534index 0000000..53b801b
535--- /dev/null
536+++ b/src/rackd_spike/internal/http/reverse.go
537@@ -0,0 +1,234 @@
538+package http
539+
540+import (
541+ "context"
542+ "crypto/tls"
543+ "errors"
544+ "fmt"
545+ "io"
546+ "net"
547+ "net/http"
548+ "net/url"
549+ "os"
550+ "path/filepath"
551+ "strconv"
552+ "sync"
553+
554+ "github.com/rs/zerolog"
555+
556+ machinehelpers "rackd/internal/machine_helpers"
557+ "rackd/internal/service"
558+)
559+
560+var (
561+ ErrNoUpstreams = errors.New("no upstreams available to proxy")
562+)
563+
564+type RevProxyService interface {
565+ service.Service
566+ Configure(context.Context, []string) error
567+}
568+
569+type ReverseProxy struct {
570+ sync.Mutex
571+ Proxy
572+ upstreamRegions []*net.TCPAddr
573+ resourcePath string
574+}
575+
576+func NewReverseProxy(ctx context.Context, machineResourcePath, bindAddr string, port int) (*ReverseProxy, error) {
577+ return NewReverseProxyWithTLS(ctx, machineResourcePath, bindAddr, port, nil)
578+}
579+
580+func NewReverseProxyWithTLS(ctx context.Context, machineResourcePath, bindAddr string, port int, tlsCfg *tls.Config) (*ReverseProxy, error) {
581+ addr := net.JoinHostPort(bindAddr, strconv.Itoa(port))
582+ return NewReverseProxyWithOptions(ctx, machineResourcePath, ProxyOptions{
583+ FrontendAddr: addr,
584+ FrontendTLSCfg: tlsCfg,
585+ })
586+}
587+
588+func NewReverseProxyWithOptions(ctx context.Context, machineResourcePath string, opts ProxyOptions) (*ReverseProxy, error) {
589+ p, err := NewProxyWithOptions(ctx, opts)
590+ if err != nil {
591+ return nil, err
592+ }
593+ r := &ReverseProxy{
594+ Proxy: *p,
595+ }
596+ mux := http.NewServeMux()
597+ mux.HandleFunc("/MAAS/", r.ServeMAAS)
598+ mux.Handle("/machine-resources/", http.FileServer(http.FS(os.DirFS(machineResourcePath))))
599+ mux.HandleFunc("/images/", r.ServeImages)
600+ mux.HandleFunc("/log", r.ServeLog)
601+ mux.HandleFunc("/", r.ServeBoot)
602+ r.Proxy.srvr.Handler = mux
603+ return r, nil
604+}
605+
606+func (r *ReverseProxy) Name() string {
607+ return "http_reverse_proxy"
608+}
609+
610+func (p *ReverseProxy) pickRegionHost() (*net.TCPAddr, error) {
611+ p.Lock()
612+ defer p.Unlock()
613+ if len(p.upstreamRegions) == 0 {
614+ return nil, ErrNoUpstreams
615+ }
616+ region := p.upstreamRegions[0]
617+ if len(p.upstreamRegions) > 1 {
618+ p.upstreamRegions = append(p.upstreamRegions[1:], region) // place the selected at end of list for round robin-ing
619+ }
620+ return region, nil
621+}
622+
623+func (p *ReverseProxy) proxyPass(ctx context.Context, fwdURL string, w http.ResponseWriter, r *http.Request) {
624+ logger := zerolog.Ctx(ctx)
625+ req, err := http.NewRequestWithContext(ctx, r.Method, fwdURL, r.Body)
626+ if err != nil {
627+ w.WriteHeader(http.StatusInternalServerError)
628+ return
629+ }
630+ req.Header = r.Header.Clone()
631+ p.setForwardingHeaders(req.Header, r)
632+ resp, err := p.client.Do(req)
633+ if err != nil {
634+ p.handleResponseError(ctx, r, resp, w, err)
635+ return
636+ }
637+ if resp.StatusCode >= 400 {
638+ p.handleResponseError(ctx, r, resp, w, nil)
639+ return
640+ }
641+ p.PassResponseHeaders(w, resp)
642+ w.WriteHeader(resp.StatusCode)
643+ _, err = io.Copy(w, resp.Body)
644+ if err != nil {
645+ logger.Err(err).Msgf("writing body to %s", r.RemoteAddr)
646+ }
647+}
648+
649+func (p *ReverseProxy) ServeMAAS(w http.ResponseWriter, r *http.Request) {
650+ ctx := p.getCtx()
651+ defer r.Body.Close()
652+ regionAddr, err := p.pickRegionHost()
653+ if err != nil {
654+ w.WriteHeader(http.StatusBadGateway)
655+ return
656+ }
657+ fwdURL := "http://" + regionAddr.String() + r.URL.RequestURI()
658+ p.proxyPass(ctx, fwdURL, w, r)
659+}
660+
661+func (p *ReverseProxy) ServeImages(w http.ResponseWriter, r *http.Request) {
662+ ctx := p.getCtx()
663+ authReq, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://"+p.opts.FrontendAddr+"/log"), http.NoBody)
664+ if err != nil {
665+ w.WriteHeader(http.StatusInternalServerError)
666+ return
667+ }
668+ resp, err := p.client.Do(authReq)
669+ if err != nil {
670+ p.handleResponseError(ctx, r, resp, w, err)
671+ return
672+ }
673+ if resp.StatusCode >= 400 {
674+ p.handleResponseError(ctx, r, resp, w, nil)
675+ return
676+ }
677+ resource := filepath.Join(p.resourcePath, r.URL.Path)
678+ f, err := os.Open(resource)
679+ if err != nil {
680+ if err == os.ErrNotExist {
681+ w.WriteHeader(http.StatusNotFound)
682+ return
683+ }
684+ if err == os.ErrPermission {
685+ w.WriteHeader(http.StatusForbidden)
686+ return
687+ }
688+ w.WriteHeader(http.StatusInternalServerError)
689+ return
690+ }
691+ defer f.Close()
692+ w.WriteHeader(http.StatusOK)
693+ _, err = io.Copy(w, f)
694+ if err != nil {
695+ logger := zerolog.Ctx(ctx)
696+ logger.Err(err).Msgf("writing body to %s", r.RemoteAddr)
697+ }
698+}
699+
700+func (p *ReverseProxy) ServeLog(w http.ResponseWriter, r *http.Request) {
701+ ctx := p.getCtx()
702+ defer r.Body.Close()
703+ err := p.checkIsInternal(r)
704+ if err != nil {
705+ w.WriteHeader(http.StatusForbidden)
706+ return
707+ }
708+ region, err := p.pickRegionHost()
709+ if err != nil {
710+ w.WriteHeader(http.StatusBadGateway)
711+ return
712+ }
713+ regionLogURL := "http://" + net.JoinHostPort(region.IP.String(), "5249") + r.URL.RequestURI()
714+ req, err := http.NewRequestWithContext(ctx, r.Method, regionLogURL, r.Body)
715+ if err != nil {
716+ w.WriteHeader(http.StatusInternalServerError)
717+ return
718+ }
719+ req.Header = r.Header.Clone()
720+ req.Header.Set("X-Original-URI", r.URL.String())
721+ req.Header.Set("X-Original-Remote-IP", r.RemoteAddr)
722+ resp, err := p.client.Do(req)
723+ if err != nil {
724+ p.handleResponseError(ctx, r, resp, w, err)
725+ return
726+ }
727+ if resp.StatusCode >= 400 {
728+ p.handleResponseError(ctx, r, resp, w, nil)
729+ return
730+ }
731+ p.PassResponseHeaders(w, resp)
732+ w.WriteHeader(resp.StatusCode)
733+ _, err = io.Copy(w, r.Body)
734+ if err != nil {
735+ logger := zerolog.Ctx(ctx)
736+ logger.Err(err).Msgf("writing body to %s", r.RemoteAddr)
737+ }
738+}
739+
740+func (p *ReverseProxy) ServeBoot(w http.ResponseWriter, r *http.Request) {
741+ ctx := p.getCtx()
742+ region, err := p.pickRegionHost()
743+ if err != nil {
744+ w.WriteHeader(http.StatusBadGateway)
745+ return
746+ }
747+ fwdURL := "http://" + net.JoinHostPort(region.IP.String(), "5249") + r.URL.RequestURI()
748+ p.proxyPass(ctx, fwdURL, w, r)
749+}
750+
751+func (p *ReverseProxy) Configure(ctx context.Context, regions []string) (err error) {
752+ p.resourcePath, err = machinehelpers.GetResourcesBinPath()
753+ if err != nil {
754+ return err
755+ }
756+ p.upstreamRegions = make([]*net.TCPAddr, len(regions))
757+ for i, region := range regions {
758+ regionURL, err := url.Parse(region)
759+ if err != nil { // if not url, try as host or IP
760+ p.upstreamRegions[i], err = net.ResolveTCPAddr("tcp", net.JoinHostPort(region, "5240"))
761+ if err != nil {
762+ return err
763+ }
764+ }
765+ p.upstreamRegions[i], err = net.ResolveTCPAddr("tcp", regionURL.Host)
766+ if err != nil {
767+ return err
768+ }
769+ }
770+ return p.Restart(ctx)
771+}
772diff --git a/src/rackd_spike/internal/service/supervisor.go b/src/rackd_spike/internal/service/supervisor.go
773index 09ba25e..ca9bd36 100644
774--- a/src/rackd_spike/internal/service/supervisor.go
775+++ b/src/rackd_spike/internal/service/supervisor.go
776@@ -24,6 +24,8 @@ var (
777 ErrInvalidServiceState = errors.New("service is in an invalid state for this operation")
778 ErrInvalidServiceType = errors.New("invalid service type for given service")
779 ErrUnexpectedServiceExit = errors.New("service exited unexpectedly")
780+ ErrUnsuccessfulStart = errors.New("not all services started correctly")
781+ ErrUnsuccessfulStop = errors.New("not all services shutdown correctly")
782 )
783
784 // Service is an interface outlining behavior to manage external services
785diff --git a/src/rackd_spike/internal/transport/rpc.go b/src/rackd_spike/internal/transport/rpc.go
786index 4298c93..b71cdc8 100644
787--- a/src/rackd_spike/internal/transport/rpc.go
788+++ b/src/rackd_spike/internal/transport/rpc.go
789@@ -198,3 +198,14 @@ func (r *RPCManager) GetHandler(handlerName string) (RPCHandler, error) {
790 }
791 return h, nil
792 }
793+
794+func (r *RPCManager) ConnsToString() []string {
795+ res := make([]string, len(r.conns))
796+
797+ var idx int
798+ for k := range r.conns { // map key should be remote address of conn
799+ res[idx] = k
800+ idx++
801+ }
802+ return res
803+}
804diff --git a/src/rackd_spike/pkg/http/proxy.go b/src/rackd_spike/pkg/http/proxy.go
805new file mode 100644
806index 0000000..caeede0
807--- /dev/null
808+++ b/src/rackd_spike/pkg/http/proxy.go
809@@ -0,0 +1,110 @@
810+package http
811+
812+import (
813+ "context"
814+ "net"
815+ internal "rackd/internal/http"
816+ "rackd/internal/metrics"
817+ "rackd/internal/service"
818+ "rackd/internal/transport"
819+ "rackd/pkg/rpc"
820+ "sync"
821+)
822+
823+type Client interface {
824+ transport.RPCClient
825+ GetProxyConfiguration(context.Context, string) error
826+}
827+
828+type CapnpClient struct {
829+ sync.Mutex
830+ svc internal.ProxyService
831+ clients map[string]*rpc.RegionController
832+ lastGoodClient string
833+ needReset bool
834+}
835+
836+func New(sup service.SvcManager) (Client, error) {
837+ svcs, err := sup.GetByType(service.SvcPROXY)
838+ if err != nil {
839+ return nil, err
840+ }
841+ svc, ok := svcs[0].(internal.ProxyService)
842+ if !ok {
843+
844+ }
845+ return &CapnpClient{
846+ svc: svc,
847+ clients: make(map[string]*rpc.RegionController),
848+ }, nil
849+}
850+
851+func (c *CapnpClient) Name() string {
852+ return "http"
853+}
854+
855+func (c *CapnpClient) RegisterMetrics(registry *metrics.Registry) error {
856+ // TODO
857+ return nil
858+}
859+
860+func (c *CapnpClient) SetupClient(ctx context.Context, client *transport.ConnWrapper) {
861+ c.Lock()
862+ defer c.Unlock()
863+ c.clients[client.Conn.RemoteAddr().String()] = client.Capnp()
864+}
865+
866+func (c *CapnpClient) getClient(reset bool) (*rpc.RegionController, error) {
867+ c.Lock()
868+ defer c.Unlock()
869+ if len(c.clients) == 0 {
870+ return nil, transport.ErrRPCClientNotFound
871+ }
872+ if !reset && len(c.lastGoodClient) > 0 {
873+ return c.clients[c.lastGoodClient], nil
874+ }
875+ for k, v := range c.clients {
876+ c.lastGoodClient = k
877+ return v, nil
878+ }
879+ return nil, transport.ErrRPCClientNotFound
880+}
881+
882+func (c *CapnpClient) GetProxyConfiguration(ctx context.Context, systemID string) error {
883+ client, err := c.getClient(c.needReset)
884+ if err != nil {
885+ return err
886+ }
887+ resp, release := client.GetProxyConfiguration(ctx, func(params rpc.RegionController_getProxyConfiguration_Params) error {
888+ return params.SetSystemId(systemID)
889+ })
890+ defer release()
891+ cfgResp, err := resp.Struct()
892+ if err != nil {
893+ return err
894+ }
895+ cfg, err := cfgResp.ProxyConfig()
896+ if err != nil {
897+ return err
898+ }
899+ enabled := cfg.Enabled()
900+ preferV4 := cfg.PreferV4Proxy()
901+ port := cfg.Port()
902+ allowedCidrsProto, err := cfg.AllowedCidrs()
903+ if err != nil {
904+ return err
905+ }
906+ allowedCidrs := make([]*net.IPNet, allowedCidrsProto.Len())
907+ for i := 0; i < allowedCidrsProto.Len(); i++ {
908+ cidrProto, err := allowedCidrsProto.At(i)
909+ if err != nil {
910+ return err
911+ }
912+ _, cidr, err := net.ParseCIDR(cidrProto)
913+ if err != nil {
914+ return err
915+ }
916+ allowedCidrs[i] = cidr
917+ }
918+ return c.svc.Configure(ctx, enabled, preferV4, port, allowedCidrs)
919+}
920diff --git a/src/rackd_spike/pkg/region/config.go b/src/rackd_spike/pkg/region/config.go
921index 2ad76a9..c199c91 100644
922--- a/src/rackd_spike/pkg/region/config.go
923+++ b/src/rackd_spike/pkg/region/config.go
924@@ -3,19 +3,53 @@ package region
925 import (
926 "context"
927
928+ "github.com/rs/zerolog"
929+
930 "rackd/internal/config"
931+ httpInternal "rackd/internal/http"
932+ "rackd/internal/service"
933 "rackd/internal/transport"
934+ "rackd/pkg/http"
935 "rackd/pkg/ntp"
936 )
937
938-func GetRemoteConfig(ctx context.Context, rpcMgr *transport.RPCManager) error {
939+func GetRemoteConfig(ctx context.Context, rpcMgr *transport.RPCManager, sup service.SvcManager) error {
940+ log := zerolog.Ctx(ctx)
941+
942 ntpIface, err := rpcMgr.GetClient("ntp")
943 if err != nil {
944 return err
945 }
946- if ntp, ok := ntpIface.(ntp.Client); ok {
947- return ntp.GetTimeConfiguration(ctx, config.Config.SystemID)
948+
949+ proxyIface, err := rpcMgr.GetClient("proxy")
950+ if err != nil {
951+ return err
952+ }
953+
954+ revProxyIface, err := sup.Get("http_reverse_proxy")
955+ if err != nil {
956+ return err
957+ }
958+
959+ if n, ok := ntpIface.(ntp.Client); ok {
960+ log.Info().Msg("configuring NTP")
961+ err = n.GetTimeConfiguration(ctx, config.Config.SystemID)
962+ if err != nil {
963+ return err
964+ }
965 }
966+ if proxy, ok := proxyIface.(http.Client); ok {
967+ log.Info().Msg("configuring proxy")
968+ err = proxy.GetProxyConfiguration(ctx, config.Config.SystemID)
969+ if err != nil {
970+ return err
971+ }
972+ }
973+ if revProxy, ok := revProxyIface.(httpInternal.RevProxyService); ok {
974+ log.Info().Msg("configuring HTTP reverse proxy")
975+ revProxy.Configure(ctx, rpcMgr.ConnsToString())
976+ }
977+
978 return nil
979 // TODO add other services that fetch config from region controller
980 }

Subscribers

People subscribed via source and target branches