Merge ~cgrabowski/maas:rack_spike_http into maas:rack_region_exploration
- Git
- lp:~cgrabowski/maas
- rack_spike_http
- Merge into rack_region_exploration
Status: | Merged |
---|---|
Approved by: | Christian Grabowski |
Approved revision: | dd8d76c96c704e489dadb54592f89f75baf83c3c |
Merge reported by: | MAAS Lander |
Merged at revision: | not available |
Proposed branch: | ~cgrabowski/maas:rack_spike_http |
Merge into: | maas:rack_region_exploration |
Diff against target: |
980 lines (+828/-10) 9 files modified
src/maasserver/rpc/capnp/region.py (+26/-4) src/rackd_spike/cmd/rackd.go (+24/-1) src/rackd_spike/go-lint.sh (+3/-2) src/rackd_spike/internal/http/proxy.go (+381/-0) src/rackd_spike/internal/http/reverse.go (+234/-0) src/rackd_spike/internal/service/supervisor.go (+2/-0) src/rackd_spike/internal/transport/rpc.go (+11/-0) src/rackd_spike/pkg/http/proxy.go (+110/-0) src/rackd_spike/pkg/region/config.go (+37/-3) |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
MAAS Lander | Approve | ||
Alexsander de Souza | Approve | ||
Review via email: mp+406257@code.launchpad.net |
Commit message
fix build
fix authentication
fix existing services
close connection on error
start supervisor earlier
add get proxy call
configure http services
add reverse proxy
make https urls work
quiet down logs
successfully proxy http
fix existing services
forward requests
add http proxy
Description of the change
- 54e4f18... by Christian Grabowski
-
fix linter issues
MAAS Lander (maas-lander) wrote : | # |
UNIT TESTS
-b rack_spike_http lp:~cgrabowski/maas/+git/maas into -b rack_region_
STATUS: SUCCESS
COMMIT: 61116646d017559
- 30c08a9... by Christian Grabowski
-
use events set in other call
MAAS Lander (maas-lander) wrote : | # |
UNIT TESTS
-b rack_spike_http lp:~cgrabowski/maas/+git/maas into -b rack_region_
STATUS: SUCCESS
COMMIT: 3c09c61450b62d1
- 581634f... by Christian Grabowski
-
initialize allowed_cidrs list first
MAAS Lander (maas-lander) wrote : | # |
UNIT TESTS
-b rack_spike_http lp:~cgrabowski/maas/+git/maas into -b rack_region_
STATUS: SUCCESS
COMMIT: 581634f038b336a
Alexsander de Souza (alexsander-souza) wrote : | # |
I think there's a typo, just fix it before landing this
- dd8d76c... by Christian Grabowski
-
fix typo
Christian Grabowski (cgrabowski) : | # |
MAAS Lander (maas-lander) wrote : | # |
UNIT TESTS
-b rack_spike_http lp:~cgrabowski/maas/+git/maas into -b rack_region_
STATUS: SUCCESS
COMMIT: dd8d76c96c704e4
Preview Diff
1 | diff --git a/src/maasserver/rpc/capnp/region.py b/src/maasserver/rpc/capnp/region.py |
2 | index ae8cb64..cbc5f05 100644 |
3 | --- a/src/maasserver/rpc/capnp/region.py |
4 | +++ b/src/maasserver/rpc/capnp/region.py |
5 | @@ -105,6 +105,7 @@ class RegionController(region.RegionController.Server): |
6 | self.shim = RegionServer() |
7 | self.rack_controllers = rack_controllers |
8 | self.server = server |
9 | + self.events = set() |
10 | super(RegionController, self).__init__() |
11 | |
12 | def ping(self): |
13 | @@ -183,7 +184,8 @@ class RegionController(region.RegionController.Server): |
14 | return None |
15 | |
16 | def getTimeConfiguration_context(self, context): |
17 | - self.event = capnp.PromiseFulfillerPair() |
18 | + event = capnp.PromiseFulfillerPair() |
19 | + self.events.add(event) |
20 | systemId = context.params.systemId |
21 | |
22 | def get_result(cfg): |
23 | @@ -200,7 +202,8 @@ class RegionController(region.RegionController.Server): |
24 | lst[i] = s |
25 | |
26 | context.results.resp = resp |
27 | - self.event.fulfill() |
28 | + event.fulfill() |
29 | + self.events.remove(event) |
30 | |
31 | self.shim.get_time_configuration(systemId).addCallback(get_result) |
32 | return self.event.promise |
33 | @@ -216,8 +219,27 @@ class RegionController(region.RegionController.Server): |
34 | self.shim.get_dns_configuration(systemId).addCallback(get_result) |
35 | return prom |
36 | |
37 | - def getProxyConfiguration(self, systemId): |
38 | - return None |
39 | + def getProxyConfiguration_context(self, context): |
40 | + system_id = context.params.systemId |
41 | + event = capnp.PromiseFulfillerPair() |
42 | + self.events.add(event) |
43 | + |
44 | + def get_results(cfg): |
45 | + resp = controller.ProxyConfiguration() |
46 | + resp.enabled = cfg.get("enabled", True) |
47 | + resp.port = cfg.get("port") |
48 | + allowed_cidrs = cfg.get("allowed_cidrs", []) |
49 | + lst = resp.init("allowed_cidrs", len(allowed_cidrs)) |
50 | + for i, cidr in enumerate(allowed_cidrs): |
51 | + lst[i] = cidr |
52 | + resp.allowedCidrs = lst |
53 | + resp.preferV4Proxy = cfg.get("prefer_v4_proxy", False) |
54 | + context.results.proxyConfig = resp |
55 | + event.fulfill() |
56 | + self.events.remove(event) |
57 | + |
58 | + self.shim.get_proxy_configuration(system_id).addCallback(get_results) |
59 | + return event.promise |
60 | |
61 | def getSyslogConfiguration(self, systemId): |
62 | return None |
63 | diff --git a/src/rackd_spike/cmd/rackd.go b/src/rackd_spike/cmd/rackd.go |
64 | index 5091571..1dd491c 100644 |
65 | --- a/src/rackd_spike/cmd/rackd.go |
66 | +++ b/src/rackd_spike/cmd/rackd.go |
67 | @@ -15,6 +15,7 @@ import ( |
68 | "rackd/cmd/subcommands" |
69 | "rackd/internal/config" |
70 | "rackd/internal/dhcp" |
71 | + "rackd/internal/http" |
72 | machinehelpers "rackd/internal/machine_helpers" |
73 | "rackd/internal/metrics" |
74 | "rackd/internal/ntp" |
75 | @@ -22,6 +23,7 @@ import ( |
76 | "rackd/internal/transport" |
77 | "rackd/pkg/authenticate" |
78 | "rackd/pkg/controller" |
79 | + httprpc "rackd/pkg/http" |
80 | ntprpc "rackd/pkg/ntp" |
81 | "rackd/pkg/region" |
82 | "rackd/pkg/register" |
83 | @@ -77,8 +79,22 @@ func registerProxyServices(ctx context.Context, sup service.SvcManager) error { |
84 | if err != nil { |
85 | return err |
86 | } |
87 | + httpProxy, err := http.NewProxy(ctx, "0.0.0.0", 80) |
88 | + if err != nil { |
89 | + return err |
90 | + } |
91 | + machineResourcesPath, err := machinehelpers.GetResourcesBinPath() |
92 | + if err != nil { |
93 | + return err |
94 | + } |
95 | + reverseProxy, err := http.NewReverseProxy(ctx, machineResourcesPath, "0.0.0.0", 5248) |
96 | + if err != nil { |
97 | + return err |
98 | + } |
99 | sup.RegisterService(dhcp.NewRelaySvc()) |
100 | sup.RegisterService(ntpProxy) |
101 | + sup.RegisterService(httpProxy) |
102 | + sup.RegisterService(reverseProxy) |
103 | return nil |
104 | } |
105 | |
106 | @@ -190,6 +206,13 @@ func runRoot(cmd *cobra.Command, args []string) error { |
107 | return err |
108 | } |
109 | rpcMgr.AddClient(ctx, ntpClient) |
110 | + |
111 | + proxyClient, err := httprpc.New(sup) |
112 | + if err != nil { |
113 | + return err |
114 | + } |
115 | + rpcMgr.AddClient(ctx, proxyClient) |
116 | + |
117 | rackController, err := controller.NewRackController(ctx, true, initRegion, sup) |
118 | if err != nil { |
119 | return err |
120 | @@ -201,7 +224,7 @@ func runRoot(cmd *cobra.Command, args []string) error { |
121 | if err != nil { |
122 | return err |
123 | } |
124 | - err = region.GetRemoteConfig(ctx, rpcMgr) |
125 | + err = region.GetRemoteConfig(ctx, rpcMgr, sup) |
126 | if err != nil { |
127 | return err |
128 | } |
129 | diff --git a/src/rackd_spike/go-lint.sh b/src/rackd_spike/go-lint.sh |
130 | index 6506442..c0e86ae 100755 |
131 | --- a/src/rackd_spike/go-lint.sh |
132 | +++ b/src/rackd_spike/go-lint.sh |
133 | @@ -5,7 +5,8 @@ go vet ./... 2>&1 | grep -v \ |
134 | -e 'internal/dhcp/service.go:46' \ |
135 | -e 'internal/dhcp/service.go:74' \ |
136 | -e 'internal/dhcp/service.go:102' \ |
137 | - -e 'internal/dhcp/service.go:130' |
138 | + -e 'internal/dhcp/service.go:130' \ |
139 | + -e 'internal/http/reverse.go:57' |
140 | |
141 | # invert grep result |
142 | -[ "$?" -ne "0" ] |
143 | \ No newline at end of file |
144 | +[ "$?" -ne "0" ] |
145 | diff --git a/src/rackd_spike/internal/http/proxy.go b/src/rackd_spike/internal/http/proxy.go |
146 | new file mode 100644 |
147 | index 0000000..69ff494 |
148 | --- /dev/null |
149 | +++ b/src/rackd_spike/internal/http/proxy.go |
150 | @@ -0,0 +1,381 @@ |
151 | +package http |
152 | + |
153 | +import ( |
154 | + "context" |
155 | + "crypto/tls" |
156 | + "errors" |
157 | + "fmt" |
158 | + "io" |
159 | + stdlibLog "log" |
160 | + "net" |
161 | + "net/http" |
162 | + "os" |
163 | + "strconv" |
164 | + "strings" |
165 | + "sync" |
166 | + "syscall" |
167 | + "time" |
168 | + |
169 | + "github.com/rs/zerolog" |
170 | + |
171 | + "rackd/internal/service" |
172 | +) |
173 | + |
174 | +var ( |
175 | + ErrCircularRequest = errors.New("error circular request") |
176 | + ErrNotInternalAddress = errors.New("received external request on internal endpoint") |
177 | + ErrClientNotAllowed = errors.New("client not found in allowed CIDRs") |
178 | +) |
179 | + |
180 | +type ProxyService interface { |
181 | + service.Service |
182 | + Configure(ctx context.Context, enabled, preferV4 bool, port int16, allowedCidrs []*net.IPNet) error |
183 | +} |
184 | + |
185 | +type Proxy struct { |
186 | + sync.Mutex |
187 | + srvr *http.Server |
188 | + client *http.Client |
189 | + opts *ProxyOptions |
190 | + selfAddr *net.TCPAddr |
191 | + enabled bool |
192 | + allowedCidrs []*net.IPNet |
193 | + getCtx func() context.Context |
194 | +} |
195 | + |
196 | +func NewProxy(ctx context.Context, bindAddr string, port int) (*Proxy, error) { |
197 | + return NewProxyWithTLS(ctx, bindAddr, port, nil) |
198 | +} |
199 | + |
200 | +func NewProxyWithTLS(ctx context.Context, bindAddr string, port int, tlsCfg *tls.Config) (*Proxy, error) { |
201 | + addr := net.JoinHostPort(bindAddr, strconv.Itoa(port)) |
202 | + return NewProxyWithOptions(ctx, ProxyOptions{ |
203 | + FrontendAddr: addr, |
204 | + FrontendTLSCfg: tlsCfg, |
205 | + }) |
206 | +} |
207 | + |
208 | +type ProxyOptions struct { |
209 | + MaxHeaderSize int |
210 | + HTTP2Enabled bool |
211 | + FrontendTLSCfg *tls.Config |
212 | + ConnectTimeout time.Duration |
213 | + ReadTimeout time.Duration |
214 | + HdrRdTimeout time.Duration |
215 | + WriteTimeout time.Duration |
216 | + IdleTimeout time.Duration |
217 | + UpstreamTLSTimeout time.Duration |
218 | + UpstreamMaxIdleConns int |
219 | + UpstreamMaxConnsPerHost int |
220 | + WriteBufSize int |
221 | + ReadBufSize int |
222 | + FrontendAddr string |
223 | + BackendAddr string |
224 | + IgnoreUpstreamHostVerify bool |
225 | + NoKeepAlive bool |
226 | + NoCompression bool |
227 | +} |
228 | + |
229 | +func NewProxyWithOptions(ctx context.Context, opts ProxyOptions) (*Proxy, error) { |
230 | + logger := zerolog.Ctx(ctx) |
231 | + errorLogger := stdlibLog.New(logger, "http-proxy", stdlibLog.LstdFlags|stdlibLog.LUTC) // wrap the existing logger in the stdlib log for http.Server |
232 | + var clientTLSCfg *tls.Config |
233 | + if opts.IgnoreUpstreamHostVerify { |
234 | + clientTLSCfg = &tls.Config{ |
235 | + InsecureSkipVerify: opts.IgnoreUpstreamHostVerify, |
236 | + } |
237 | + } |
238 | + if len(opts.BackendAddr) == 0 { |
239 | + opts.BackendAddr = "0.0.0.0" |
240 | + } |
241 | + backendAddr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(opts.BackendAddr, "0")) |
242 | + if err != nil { |
243 | + return nil, err |
244 | + } |
245 | + frontendAddr, err := net.ResolveTCPAddr("tcp", opts.FrontendAddr) |
246 | + if err != nil { |
247 | + return nil, err |
248 | + } |
249 | + httpDialer := &net.Dialer{ |
250 | + Timeout: opts.ConnectTimeout, |
251 | + LocalAddr: backendAddr, |
252 | + Cancel: ctx.Done(), |
253 | + } |
254 | + if opts.NoKeepAlive { |
255 | + httpDialer.KeepAlive = -1 |
256 | + } |
257 | + httpsDialer := &tls.Dialer{ |
258 | + NetDialer: httpDialer, |
259 | + Config: clientTLSCfg, |
260 | + } |
261 | + if opts.HTTP2Enabled { |
262 | + opts.FrontendTLSCfg.NextProtos = []string{"h2"} |
263 | + } |
264 | + p := &Proxy{ |
265 | + srvr: &http.Server{ |
266 | + Addr: opts.FrontendAddr, |
267 | + TLSConfig: opts.FrontendTLSCfg, |
268 | + ReadTimeout: opts.ReadTimeout, |
269 | + ReadHeaderTimeout: opts.HdrRdTimeout, |
270 | + WriteTimeout: opts.WriteTimeout, |
271 | + IdleTimeout: opts.IdleTimeout, |
272 | + ErrorLog: errorLogger, |
273 | + BaseContext: func(_ net.Listener) context.Context { |
274 | + return ctx |
275 | + }, |
276 | + }, |
277 | + client: &http.Client{ |
278 | + Transport: &http.Transport{ |
279 | + Proxy: nil, |
280 | + DialContext: httpDialer.DialContext, |
281 | + DialTLSContext: httpsDialer.DialContext, |
282 | + TLSClientConfig: clientTLSCfg, |
283 | + TLSHandshakeTimeout: opts.UpstreamTLSTimeout, |
284 | + DisableKeepAlives: opts.NoKeepAlive, |
285 | + DisableCompression: opts.NoCompression, |
286 | + MaxIdleConns: opts.UpstreamMaxIdleConns, |
287 | + MaxConnsPerHost: opts.UpstreamMaxConnsPerHost, |
288 | + IdleConnTimeout: opts.IdleTimeout, |
289 | + ResponseHeaderTimeout: opts.HdrRdTimeout, |
290 | + MaxResponseHeaderBytes: int64(opts.MaxHeaderSize), |
291 | + WriteBufferSize: opts.WriteBufSize, |
292 | + ReadBufferSize: opts.ReadBufSize, |
293 | + ForceAttemptHTTP2: opts.HTTP2Enabled, |
294 | + }, |
295 | + }, |
296 | + opts: &opts, |
297 | + selfAddr: frontendAddr, |
298 | + } |
299 | + p.srvr.Handler = p |
300 | + return p, nil |
301 | +} |
302 | + |
303 | +func (p *Proxy) Name() string { |
304 | + return "http_proxy" |
305 | +} |
306 | + |
307 | +func (p *Proxy) Type() int { |
308 | + return service.SvcPROXY |
309 | +} |
310 | + |
311 | +func (p *Proxy) PID() int { |
312 | + return os.Getpid() |
313 | +} |
314 | + |
315 | +func (p *Proxy) Start(ctx context.Context) (err error) { |
316 | + if !p.enabled { |
317 | + return nil |
318 | + } |
319 | + var listener net.Listener |
320 | + if p.opts.FrontendTLSCfg != nil { |
321 | + listener, err = tls.Listen("tcp", p.opts.FrontendAddr, p.opts.FrontendTLSCfg) |
322 | + } else { |
323 | + listener, err = net.Listen("tcp", p.opts.FrontendAddr) |
324 | + } |
325 | + if err != nil { |
326 | + return err |
327 | + } |
328 | + p.getCtx = func() context.Context { |
329 | + return ctx |
330 | + } |
331 | + go p.srvr.Serve(listener) |
332 | + return nil |
333 | +} |
334 | + |
335 | +func (p *Proxy) Stop(ctx context.Context) error { |
336 | + return p.srvr.Shutdown(ctx) |
337 | +} |
338 | + |
339 | +func (p *Proxy) checkForCircularReq(r *http.Request) (err error) { |
340 | + port := r.URL.Port() |
341 | + if len(port) == 0 { |
342 | + if r.URL.Scheme == "https" { |
343 | + port = "443" |
344 | + } else { |
345 | + port = "80" |
346 | + } |
347 | + } |
348 | + var ips []net.IP |
349 | + ip := net.ParseIP(r.URL.Hostname()) |
350 | + if ip == nil { |
351 | + ips, err = net.DefaultResolver.LookupIP(p.getCtx(), "ip", r.URL.Hostname()) |
352 | + if err != nil { |
353 | + return err |
354 | + } |
355 | + } else { |
356 | + ips = []net.IP{ip} |
357 | + } |
358 | + for _, iaddr := range ips { |
359 | + remoteAddr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(iaddr.String(), port)) |
360 | + if err != nil { |
361 | + return err |
362 | + } |
363 | + if remoteAddr.String() == p.selfAddr.String() { |
364 | + return ErrCircularRequest |
365 | + } |
366 | + } |
367 | + return nil |
368 | +} |
369 | + |
370 | +func (p *Proxy) setForwardingHeaders(h http.Header, req *http.Request) { |
371 | + h.Set("X-Forwarded-For", req.RemoteAddr) |
372 | + h.Set("X-Forwarded-Host", req.Host) |
373 | + h.Set("Host", req.Host) |
374 | + h.Set("Forwarded", fmt.Sprintf("by=%s", p.selfAddr.String())) |
375 | + h.Add("Forwarded", fmt.Sprintf("for=%s", req.RemoteAddr)) |
376 | + h.Add("Forwarded", fmt.Sprintf("host=%s", req.Host)) |
377 | + h.Add("Forwarded", fmt.Sprintf("proto=%s", req.URL.Scheme)) |
378 | + h.Set("Origin", fmt.Sprintf("%s://%s", req.URL.Scheme, req.URL.Host)) |
379 | +} |
380 | + |
381 | +func (p *Proxy) handleNetError(ctx context.Context, req *http.Request, w http.ResponseWriter, err error) { |
382 | + if err == net.ErrClosed || err == syscall.ECONNREFUSED { |
383 | + w.WriteHeader(http.StatusServiceUnavailable) |
384 | + return |
385 | + } |
386 | + if err == syscall.ECONNRESET || err == syscall.EHOSTUNREACH || err == syscall.EIO || err == syscall.EPIPE { |
387 | + w.WriteHeader(http.StatusBadGateway) |
388 | + return |
389 | + } |
390 | + if timeout, ok := err.(net.Error); (ok && timeout.Timeout()) || err == syscall.ETIMEDOUT { |
391 | + w.WriteHeader(http.StatusGatewayTimeout) |
392 | + return |
393 | + } |
394 | + w.WriteHeader(http.StatusInternalServerError) |
395 | +} |
396 | + |
397 | +func (p *Proxy) handleResponseError(ctx context.Context, req *http.Request, resp *http.Response, w http.ResponseWriter, err error) { |
398 | + logger := zerolog.Ctx(ctx) |
399 | + if resp == nil { |
400 | + logger.Err(err).Msgf("no response received") |
401 | + p.handleNetError(ctx, req, w, err) |
402 | + return |
403 | + } |
404 | + logger.Err(err).Msgf("forward error to: %s, status %d", req.URL.RequestURI(), resp.StatusCode) |
405 | + switch resp.StatusCode { |
406 | + case 500, 400, 401, 402, 403, 404, 406, 409, 410, 411, 412, 413, 414, 415, 416, 417, 422, 423, 424, 425, 428, 431, 451, 501, 507, 510, 511: |
407 | + w.WriteHeader(resp.StatusCode) |
408 | + return |
409 | + case 502, 503, 504, 505, 506: |
410 | + w.WriteHeader(http.StatusBadGateway) |
411 | + return |
412 | + default: |
413 | + p.handleNetError(ctx, req, w, err) |
414 | + } |
415 | +} |
416 | + |
417 | +func (p *Proxy) PassResponseHeaders(w http.ResponseWriter, resp *http.Response) { |
418 | + for k, v := range resp.Header { |
419 | + w.Header().Set(k, v[0]) |
420 | + if len(v) > 1 { |
421 | + for _, val := range v[1:] { |
422 | + w.Header().Add(k, val) |
423 | + } |
424 | + } |
425 | + } |
426 | +} |
427 | + |
428 | +func (p *Proxy) checkIsInternal(r *http.Request) error { |
429 | + hostPortSlice := strings.Split(r.RemoteAddr, ":") |
430 | + remoteHost, _ := hostPortSlice[0], hostPortSlice[1] |
431 | + for _, internalHost := range []string{"localhost", "127.0.0.1", "::1"} { |
432 | + if remoteHost == internalHost { |
433 | + return nil |
434 | + } |
435 | + } |
436 | + return ErrNotInternalAddress |
437 | +} |
438 | + |
439 | +func (p *Proxy) checkIPAllowed(remoteAddr string) error { |
440 | + remoteHost, _, err := net.SplitHostPort(remoteAddr) |
441 | + if err != nil { |
442 | + return err |
443 | + } |
444 | + remoteIP := net.ParseIP(remoteHost) |
445 | + _, localNet, err := net.ParseCIDR("127.0.0.0/8") |
446 | + if err != nil { |
447 | + return err |
448 | + } |
449 | + for _, allowed := range append(p.allowedCidrs, localNet) { |
450 | + if !allowed.Contains(remoteIP) { |
451 | + return ErrClientNotAllowed |
452 | + } |
453 | + } |
454 | + return nil |
455 | +} |
456 | + |
457 | +func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
458 | + logger := zerolog.Ctx(p.getCtx()) |
459 | + defer r.Body.Close() |
460 | + err := p.checkIPAllowed(r.RemoteAddr) |
461 | + if err != nil { |
462 | + w.WriteHeader(http.StatusForbidden) |
463 | + return |
464 | + } |
465 | + fwdURL := r.URL.String() |
466 | + if len(r.URL.Scheme) == 0 { |
467 | + if r.URL.Port() == "443" { |
468 | + fwdURL = "https:" + fwdURL |
469 | + } else { |
470 | + fwdURL = "http:" + fwdURL |
471 | + } |
472 | + } |
473 | + logger.Debug().Msgf("%s requesting %s %s", r.RemoteAddr, r.Method, fwdURL) |
474 | + err = p.checkForCircularReq(r) |
475 | + if err != nil { |
476 | + w.WriteHeader(http.StatusLoopDetected) |
477 | + return |
478 | + } |
479 | + fwdReq, err := http.NewRequestWithContext(p.getCtx(), r.Method, fwdURL, r.Body) |
480 | + if err != nil { |
481 | + w.WriteHeader(http.StatusInternalServerError) |
482 | + return |
483 | + } |
484 | + fwdReq.Header = r.Header.Clone() |
485 | + p.setForwardingHeaders(fwdReq.Header, r) |
486 | + resp, err := p.client.Do(fwdReq) |
487 | + if err != nil { |
488 | + p.handleResponseError(p.getCtx(), r, resp, w, err) |
489 | + return |
490 | + } |
491 | + defer logger.Debug().Msgf("%s responding to %s, %s", fwdURL, r.RemoteAddr, resp.Status) |
492 | + defer resp.Body.Close() |
493 | + if resp.StatusCode >= 400 { |
494 | + p.handleResponseError(p.getCtx(), r, resp, w, nil) |
495 | + return |
496 | + } |
497 | + p.PassResponseHeaders(w, resp) |
498 | + w.WriteHeader(resp.StatusCode) |
499 | + _, err = io.Copy(w, resp.Body) |
500 | + if err != nil { |
501 | + logger := zerolog.Ctx(p.getCtx()) |
502 | + logger.Err(err).Msgf("writing body to %s", r.RemoteAddr) |
503 | + } |
504 | +} |
505 | + |
506 | +func (p *Proxy) Restart(ctx context.Context) error { |
507 | + err := p.Stop(ctx) |
508 | + if err != nil { |
509 | + return err |
510 | + } |
511 | + return p.Start(ctx) |
512 | +} |
513 | + |
514 | +func (p *Proxy) Status(_ context.Context) error { |
515 | + return nil |
516 | +} |
517 | + |
518 | +func (p *Proxy) Configure(ctx context.Context, enabled, preferV4 bool, port int16, allowedCidrs []*net.IPNet) (err error) { |
519 | + p.Lock() |
520 | + defer p.Unlock() |
521 | + p.enabled = enabled |
522 | + // TODO resolve DNS for IPv4 first if preferV4 |
523 | + |
524 | + p.opts.FrontendAddr = net.JoinHostPort(p.selfAddr.IP.String(), strconv.Itoa(int(port))) |
525 | + p.selfAddr, err = net.ResolveTCPAddr("tcp", p.opts.FrontendAddr) |
526 | + if err != nil { |
527 | + return err |
528 | + } |
529 | + p.allowedCidrs = allowedCidrs |
530 | + return p.Restart(ctx) |
531 | +} |
532 | diff --git a/src/rackd_spike/internal/http/reverse.go b/src/rackd_spike/internal/http/reverse.go |
533 | new file mode 100644 |
534 | index 0000000..53b801b |
535 | --- /dev/null |
536 | +++ b/src/rackd_spike/internal/http/reverse.go |
537 | @@ -0,0 +1,234 @@ |
538 | +package http |
539 | + |
540 | +import ( |
541 | + "context" |
542 | + "crypto/tls" |
543 | + "errors" |
544 | + "fmt" |
545 | + "io" |
546 | + "net" |
547 | + "net/http" |
548 | + "net/url" |
549 | + "os" |
550 | + "path/filepath" |
551 | + "strconv" |
552 | + "sync" |
553 | + |
554 | + "github.com/rs/zerolog" |
555 | + |
556 | + machinehelpers "rackd/internal/machine_helpers" |
557 | + "rackd/internal/service" |
558 | +) |
559 | + |
560 | +var ( |
561 | + ErrNoUpstreams = errors.New("no upstreams available to proxy") |
562 | +) |
563 | + |
564 | +type RevProxyService interface { |
565 | + service.Service |
566 | + Configure(context.Context, []string) error |
567 | +} |
568 | + |
569 | +type ReverseProxy struct { |
570 | + sync.Mutex |
571 | + Proxy |
572 | + upstreamRegions []*net.TCPAddr |
573 | + resourcePath string |
574 | +} |
575 | + |
576 | +func NewReverseProxy(ctx context.Context, machineResourcePath, bindAddr string, port int) (*ReverseProxy, error) { |
577 | + return NewReverseProxyWithTLS(ctx, machineResourcePath, bindAddr, port, nil) |
578 | +} |
579 | + |
580 | +func NewReverseProxyWithTLS(ctx context.Context, machineResourcePath, bindAddr string, port int, tlsCfg *tls.Config) (*ReverseProxy, error) { |
581 | + addr := net.JoinHostPort(bindAddr, strconv.Itoa(port)) |
582 | + return NewReverseProxyWithOptions(ctx, machineResourcePath, ProxyOptions{ |
583 | + FrontendAddr: addr, |
584 | + FrontendTLSCfg: tlsCfg, |
585 | + }) |
586 | +} |
587 | + |
588 | +func NewReverseProxyWithOptions(ctx context.Context, machineResourcePath string, opts ProxyOptions) (*ReverseProxy, error) { |
589 | + p, err := NewProxyWithOptions(ctx, opts) |
590 | + if err != nil { |
591 | + return nil, err |
592 | + } |
593 | + r := &ReverseProxy{ |
594 | + Proxy: *p, |
595 | + } |
596 | + mux := http.NewServeMux() |
597 | + mux.HandleFunc("/MAAS/", r.ServeMAAS) |
598 | + mux.Handle("/machine-resources/", http.FileServer(http.FS(os.DirFS(machineResourcePath)))) |
599 | + mux.HandleFunc("/images/", r.ServeImages) |
600 | + mux.HandleFunc("/log", r.ServeLog) |
601 | + mux.HandleFunc("/", r.ServeBoot) |
602 | + r.Proxy.srvr.Handler = mux |
603 | + return r, nil |
604 | +} |
605 | + |
606 | +func (r *ReverseProxy) Name() string { |
607 | + return "http_reverse_proxy" |
608 | +} |
609 | + |
610 | +func (p *ReverseProxy) pickRegionHost() (*net.TCPAddr, error) { |
611 | + p.Lock() |
612 | + defer p.Unlock() |
613 | + if len(p.upstreamRegions) == 0 { |
614 | + return nil, ErrNoUpstreams |
615 | + } |
616 | + region := p.upstreamRegions[0] |
617 | + if len(p.upstreamRegions) > 1 { |
618 | + p.upstreamRegions = append(p.upstreamRegions[1:], region) // place the selected at end of list for round robin-ing |
619 | + } |
620 | + return region, nil |
621 | +} |
622 | + |
623 | +func (p *ReverseProxy) proxyPass(ctx context.Context, fwdURL string, w http.ResponseWriter, r *http.Request) { |
624 | + logger := zerolog.Ctx(ctx) |
625 | + req, err := http.NewRequestWithContext(ctx, r.Method, fwdURL, r.Body) |
626 | + if err != nil { |
627 | + w.WriteHeader(http.StatusInternalServerError) |
628 | + return |
629 | + } |
630 | + req.Header = r.Header.Clone() |
631 | + p.setForwardingHeaders(req.Header, r) |
632 | + resp, err := p.client.Do(req) |
633 | + if err != nil { |
634 | + p.handleResponseError(ctx, r, resp, w, err) |
635 | + return |
636 | + } |
637 | + if resp.StatusCode >= 400 { |
638 | + p.handleResponseError(ctx, r, resp, w, nil) |
639 | + return |
640 | + } |
641 | + p.PassResponseHeaders(w, resp) |
642 | + w.WriteHeader(resp.StatusCode) |
643 | + _, err = io.Copy(w, resp.Body) |
644 | + if err != nil { |
645 | + logger.Err(err).Msgf("writing body to %s", r.RemoteAddr) |
646 | + } |
647 | +} |
648 | + |
649 | +func (p *ReverseProxy) ServeMAAS(w http.ResponseWriter, r *http.Request) { |
650 | + ctx := p.getCtx() |
651 | + defer r.Body.Close() |
652 | + regionAddr, err := p.pickRegionHost() |
653 | + if err != nil { |
654 | + w.WriteHeader(http.StatusBadGateway) |
655 | + return |
656 | + } |
657 | + fwdURL := "http://" + regionAddr.String() + r.URL.RequestURI() |
658 | + p.proxyPass(ctx, fwdURL, w, r) |
659 | +} |
660 | + |
661 | +func (p *ReverseProxy) ServeImages(w http.ResponseWriter, r *http.Request) { |
662 | + ctx := p.getCtx() |
663 | + authReq, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://"+p.opts.FrontendAddr+"/log"), http.NoBody) |
664 | + if err != nil { |
665 | + w.WriteHeader(http.StatusInternalServerError) |
666 | + return |
667 | + } |
668 | + resp, err := p.client.Do(authReq) |
669 | + if err != nil { |
670 | + p.handleResponseError(ctx, r, resp, w, err) |
671 | + return |
672 | + } |
673 | + if resp.StatusCode >= 400 { |
674 | + p.handleResponseError(ctx, r, resp, w, nil) |
675 | + return |
676 | + } |
677 | + resource := filepath.Join(p.resourcePath, r.URL.Path) |
678 | + f, err := os.Open(resource) |
679 | + if err != nil { |
680 | + if err == os.ErrNotExist { |
681 | + w.WriteHeader(http.StatusNotFound) |
682 | + return |
683 | + } |
684 | + if err == os.ErrPermission { |
685 | + w.WriteHeader(http.StatusForbidden) |
686 | + return |
687 | + } |
688 | + w.WriteHeader(http.StatusInternalServerError) |
689 | + return |
690 | + } |
691 | + defer f.Close() |
692 | + w.WriteHeader(http.StatusOK) |
693 | + _, err = io.Copy(w, f) |
694 | + if err != nil { |
695 | + logger := zerolog.Ctx(ctx) |
696 | + logger.Err(err).Msgf("writing body to %s", r.RemoteAddr) |
697 | + } |
698 | +} |
699 | + |
700 | +func (p *ReverseProxy) ServeLog(w http.ResponseWriter, r *http.Request) { |
701 | + ctx := p.getCtx() |
702 | + defer r.Body.Close() |
703 | + err := p.checkIsInternal(r) |
704 | + if err != nil { |
705 | + w.WriteHeader(http.StatusForbidden) |
706 | + return |
707 | + } |
708 | + region, err := p.pickRegionHost() |
709 | + if err != nil { |
710 | + w.WriteHeader(http.StatusBadGateway) |
711 | + return |
712 | + } |
713 | + regionLogURL := "http://" + net.JoinHostPort(region.IP.String(), "5249") + r.URL.RequestURI() |
714 | + req, err := http.NewRequestWithContext(ctx, r.Method, regionLogURL, r.Body) |
715 | + if err != nil { |
716 | + w.WriteHeader(http.StatusInternalServerError) |
717 | + return |
718 | + } |
719 | + req.Header = r.Header.Clone() |
720 | + req.Header.Set("X-Original-URI", r.URL.String()) |
721 | + req.Header.Set("X-Original-Remote-IP", r.RemoteAddr) |
722 | + resp, err := p.client.Do(req) |
723 | + if err != nil { |
724 | + p.handleResponseError(ctx, r, resp, w, err) |
725 | + return |
726 | + } |
727 | + if resp.StatusCode >= 400 { |
728 | + p.handleResponseError(ctx, r, resp, w, nil) |
729 | + return |
730 | + } |
731 | + p.PassResponseHeaders(w, resp) |
732 | + w.WriteHeader(resp.StatusCode) |
733 | + _, err = io.Copy(w, r.Body) |
734 | + if err != nil { |
735 | + logger := zerolog.Ctx(ctx) |
736 | + logger.Err(err).Msgf("writing body to %s", r.RemoteAddr) |
737 | + } |
738 | +} |
739 | + |
740 | +func (p *ReverseProxy) ServeBoot(w http.ResponseWriter, r *http.Request) { |
741 | + ctx := p.getCtx() |
742 | + region, err := p.pickRegionHost() |
743 | + if err != nil { |
744 | + w.WriteHeader(http.StatusBadGateway) |
745 | + return |
746 | + } |
747 | + fwdURL := "http://" + net.JoinHostPort(region.IP.String(), "5249") + r.URL.RequestURI() |
748 | + p.proxyPass(ctx, fwdURL, w, r) |
749 | +} |
750 | + |
751 | +func (p *ReverseProxy) Configure(ctx context.Context, regions []string) (err error) { |
752 | + p.resourcePath, err = machinehelpers.GetResourcesBinPath() |
753 | + if err != nil { |
754 | + return err |
755 | + } |
756 | + p.upstreamRegions = make([]*net.TCPAddr, len(regions)) |
757 | + for i, region := range regions { |
758 | + regionURL, err := url.Parse(region) |
759 | + if err != nil { // if not url, try as host or IP |
760 | + p.upstreamRegions[i], err = net.ResolveTCPAddr("tcp", net.JoinHostPort(region, "5240")) |
761 | + if err != nil { |
762 | + return err |
763 | + } |
764 | + } |
765 | + p.upstreamRegions[i], err = net.ResolveTCPAddr("tcp", regionURL.Host) |
766 | + if err != nil { |
767 | + return err |
768 | + } |
769 | + } |
770 | + return p.Restart(ctx) |
771 | +} |
772 | diff --git a/src/rackd_spike/internal/service/supervisor.go b/src/rackd_spike/internal/service/supervisor.go |
773 | index 09ba25e..ca9bd36 100644 |
774 | --- a/src/rackd_spike/internal/service/supervisor.go |
775 | +++ b/src/rackd_spike/internal/service/supervisor.go |
776 | @@ -24,6 +24,8 @@ var ( |
777 | ErrInvalidServiceState = errors.New("service is in an invalid state for this operation") |
778 | ErrInvalidServiceType = errors.New("invalid service type for given service") |
779 | ErrUnexpectedServiceExit = errors.New("service exited unexpectedly") |
780 | + ErrUnsuccessfulStart = errors.New("not all services started correctly") |
781 | + ErrUnsuccessfulStop = errors.New("not all services shutdown correctly") |
782 | ) |
783 | |
784 | // Service is an interface outlining behavior to manage external services |
785 | diff --git a/src/rackd_spike/internal/transport/rpc.go b/src/rackd_spike/internal/transport/rpc.go |
786 | index 4298c93..b71cdc8 100644 |
787 | --- a/src/rackd_spike/internal/transport/rpc.go |
788 | +++ b/src/rackd_spike/internal/transport/rpc.go |
789 | @@ -198,3 +198,14 @@ func (r *RPCManager) GetHandler(handlerName string) (RPCHandler, error) { |
790 | } |
791 | return h, nil |
792 | } |
793 | + |
794 | +func (r *RPCManager) ConnsToString() []string { |
795 | + res := make([]string, len(r.conns)) |
796 | + |
797 | + var idx int |
798 | + for k := range r.conns { // map key should be remote address of conn |
799 | + res[idx] = k |
800 | + idx++ |
801 | + } |
802 | + return res |
803 | +} |
804 | diff --git a/src/rackd_spike/pkg/http/proxy.go b/src/rackd_spike/pkg/http/proxy.go |
805 | new file mode 100644 |
806 | index 0000000..caeede0 |
807 | --- /dev/null |
808 | +++ b/src/rackd_spike/pkg/http/proxy.go |
809 | @@ -0,0 +1,110 @@ |
810 | +package http |
811 | + |
812 | +import ( |
813 | + "context" |
814 | + "net" |
815 | + internal "rackd/internal/http" |
816 | + "rackd/internal/metrics" |
817 | + "rackd/internal/service" |
818 | + "rackd/internal/transport" |
819 | + "rackd/pkg/rpc" |
820 | + "sync" |
821 | +) |
822 | + |
823 | +type Client interface { |
824 | + transport.RPCClient |
825 | + GetProxyConfiguration(context.Context, string) error |
826 | +} |
827 | + |
828 | +type CapnpClient struct { |
829 | + sync.Mutex |
830 | + svc internal.ProxyService |
831 | + clients map[string]*rpc.RegionController |
832 | + lastGoodClient string |
833 | + needReset bool |
834 | +} |
835 | + |
836 | +func New(sup service.SvcManager) (Client, error) { |
837 | + svcs, err := sup.GetByType(service.SvcPROXY) |
838 | + if err != nil { |
839 | + return nil, err |
840 | + } |
841 | + svc, ok := svcs[0].(internal.ProxyService) |
842 | + if !ok { |
843 | + |
844 | + } |
845 | + return &CapnpClient{ |
846 | + svc: svc, |
847 | + clients: make(map[string]*rpc.RegionController), |
848 | + }, nil |
849 | +} |
850 | + |
851 | +func (c *CapnpClient) Name() string { |
852 | + return "http" |
853 | +} |
854 | + |
855 | +func (c *CapnpClient) RegisterMetrics(registry *metrics.Registry) error { |
856 | + // TODO |
857 | + return nil |
858 | +} |
859 | + |
860 | +func (c *CapnpClient) SetupClient(ctx context.Context, client *transport.ConnWrapper) { |
861 | + c.Lock() |
862 | + defer c.Unlock() |
863 | + c.clients[client.Conn.RemoteAddr().String()] = client.Capnp() |
864 | +} |
865 | + |
866 | +func (c *CapnpClient) getClient(reset bool) (*rpc.RegionController, error) { |
867 | + c.Lock() |
868 | + defer c.Unlock() |
869 | + if len(c.clients) == 0 { |
870 | + return nil, transport.ErrRPCClientNotFound |
871 | + } |
872 | + if !reset && len(c.lastGoodClient) > 0 { |
873 | + return c.clients[c.lastGoodClient], nil |
874 | + } |
875 | + for k, v := range c.clients { |
876 | + c.lastGoodClient = k |
877 | + return v, nil |
878 | + } |
879 | + return nil, transport.ErrRPCClientNotFound |
880 | +} |
881 | + |
882 | +func (c *CapnpClient) GetProxyConfiguration(ctx context.Context, systemID string) error { |
883 | + client, err := c.getClient(c.needReset) |
884 | + if err != nil { |
885 | + return err |
886 | + } |
887 | + resp, release := client.GetProxyConfiguration(ctx, func(params rpc.RegionController_getProxyConfiguration_Params) error { |
888 | + return params.SetSystemId(systemID) |
889 | + }) |
890 | + defer release() |
891 | + cfgResp, err := resp.Struct() |
892 | + if err != nil { |
893 | + return err |
894 | + } |
895 | + cfg, err := cfgResp.ProxyConfig() |
896 | + if err != nil { |
897 | + return err |
898 | + } |
899 | + enabled := cfg.Enabled() |
900 | + preferV4 := cfg.PreferV4Proxy() |
901 | + port := cfg.Port() |
902 | + allowedCidrsProto, err := cfg.AllowedCidrs() |
903 | + if err != nil { |
904 | + return err |
905 | + } |
906 | + allowedCidrs := make([]*net.IPNet, allowedCidrsProto.Len()) |
907 | + for i := 0; i < allowedCidrsProto.Len(); i++ { |
908 | + cidrProto, err := allowedCidrsProto.At(i) |
909 | + if err != nil { |
910 | + return err |
911 | + } |
912 | + _, cidr, err := net.ParseCIDR(cidrProto) |
913 | + if err != nil { |
914 | + return err |
915 | + } |
916 | + allowedCidrs[i] = cidr |
917 | + } |
918 | + return c.svc.Configure(ctx, enabled, preferV4, port, allowedCidrs) |
919 | +} |
920 | diff --git a/src/rackd_spike/pkg/region/config.go b/src/rackd_spike/pkg/region/config.go |
921 | index 2ad76a9..c199c91 100644 |
922 | --- a/src/rackd_spike/pkg/region/config.go |
923 | +++ b/src/rackd_spike/pkg/region/config.go |
924 | @@ -3,19 +3,53 @@ package region |
925 | import ( |
926 | "context" |
927 | |
928 | + "github.com/rs/zerolog" |
929 | + |
930 | "rackd/internal/config" |
931 | + httpInternal "rackd/internal/http" |
932 | + "rackd/internal/service" |
933 | "rackd/internal/transport" |
934 | + "rackd/pkg/http" |
935 | "rackd/pkg/ntp" |
936 | ) |
937 | |
938 | -func GetRemoteConfig(ctx context.Context, rpcMgr *transport.RPCManager) error { |
939 | +func GetRemoteConfig(ctx context.Context, rpcMgr *transport.RPCManager, sup service.SvcManager) error { |
940 | + log := zerolog.Ctx(ctx) |
941 | + |
942 | ntpIface, err := rpcMgr.GetClient("ntp") |
943 | if err != nil { |
944 | return err |
945 | } |
946 | - if ntp, ok := ntpIface.(ntp.Client); ok { |
947 | - return ntp.GetTimeConfiguration(ctx, config.Config.SystemID) |
948 | + |
949 | + proxyIface, err := rpcMgr.GetClient("proxy") |
950 | + if err != nil { |
951 | + return err |
952 | + } |
953 | + |
954 | + revProxyIface, err := sup.Get("http_reverse_proxy") |
955 | + if err != nil { |
956 | + return err |
957 | + } |
958 | + |
959 | + if n, ok := ntpIface.(ntp.Client); ok { |
960 | + log.Info().Msg("configuring NTP") |
961 | + err = n.GetTimeConfiguration(ctx, config.Config.SystemID) |
962 | + if err != nil { |
963 | + return err |
964 | + } |
965 | } |
966 | + if proxy, ok := proxyIface.(http.Client); ok { |
967 | + log.Info().Msg("configuring proxy") |
968 | + err = proxy.GetProxyConfiguration(ctx, config.Config.SystemID) |
969 | + if err != nil { |
970 | + return err |
971 | + } |
972 | + } |
973 | + if revProxy, ok := revProxyIface.(httpInternal.RevProxyService); ok { |
974 | + log.Info().Msg("configuring HTTP reverse proxy") |
975 | + revProxy.Configure(ctx, rpcMgr.ConnsToString()) |
976 | + } |
977 | + |
978 | return nil |
979 | // TODO add other services that fetch config from region controller |
980 | } |
UNIT TESTS exploration lp:~maas-committers/maas
-b rack_spike_http lp:~cgrabowski/maas/+git/maas into -b rack_region_
STATUS: FAILED maas-ci. internal: 8080/job/ maas/job/ branch- tester/ 10593/console c77051f36feaca0 6e097c877e
LOG: http://
COMMIT: c557636756d200e