Merge lp:~axwalk/juju-core/instancedistributor-policy into lp:~go-bot/juju-core/trunk

Proposed by Andrew Wilkins
Status: Merged
Merged at revision: 2812
Proposed branch: lp:~axwalk/juju-core/instancedistributor-policy
Merge into: lp:~go-bot/juju-core/trunk
Prerequisite: lp:~axwalk/juju-core/startinstance-principalunit
Diff against target: 293 lines (+181/-32) (has conflicts)
5 files modified
environs/statepolicy.go (+14/-0)
state/apiserver/provisioner/provisioner.go (+3/-20)
state/distribution.go (+77/-0)
state/policy.go (+30/-0)
state/unit.go (+57/-12)
Text conflict in environs/statepolicy.go
Text conflict in state/policy.go
To merge this branch: bzr merge lp:~axwalk/juju-core/instancedistributor-policy
Reviewer Review Type Date Requested Status
Juju Engineering Pending
Review via email: mp+212994@code.launchpad.net

Description of the change

Introduce InstanceDistributor policy

This is a policy that is invoked whenever
we attempt to assign a unit to an empty
machine.

The policy will be invoked with the
associated instances (i.e. provisioned,
empty machines) and the distribution group
instances (i.e. all other instances which
have assigned units of the same type.)

Azure will return nil, whereas ec2 will
want to check which availability zone each
machine is in and distribute for maximum
distance.

https://codereview.appspot.com/81340043/

To post a comment you must log in.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'environs/statepolicy.go'
--- environs/statepolicy.go 2014-03-25 13:01:16 +0000
+++ environs/statepolicy.go 2014-03-27 07:16:15 +0000
@@ -33,7 +33,21 @@
33 }33 }
34 return nil, errors.NewNotImplementedError("Prechecker")34 return nil, errors.NewNotImplementedError("Prechecker")
35}35}
36<<<<<<< TREE
3637
37func (environStatePolicy) ConfigValidator(providerType string) (state.ConfigValidator, error) {38func (environStatePolicy) ConfigValidator(providerType string) (state.ConfigValidator, error) {
38 return Provider(providerType)39 return Provider(providerType)
39}40}
41=======
42
43func (environStatePolicy) InstanceDistributor(cfg *config.Config) (state.InstanceDistributor, error) {
44 env, err := New(cfg)
45 if err != nil {
46 return nil, err
47 }
48 if p, ok := env.(state.InstanceDistributor); ok {
49 return p, nil
50 }
51 return nil, errors.NewNotImplementedError("InstanceDistributor")
52}
53>>>>>>> MERGE-SOURCE
4054
=== modified file 'state/apiserver/provisioner/provisioner.go'
--- state/apiserver/provisioner/provisioner.go 2014-03-27 07:16:13 +0000
+++ state/apiserver/provisioner/provisioner.go 2014-03-27 07:16:15 +0000
@@ -351,29 +351,12 @@
351 if err != nil {351 if err != nil {
352 return nil, err352 return nil, err
353 }353 }
354 allUnits, err := service.AllUnits()354 instanceIds, err := service.ServiceInstances()
355 if err != nil {355 if err != nil {
356 return nil, err356 return nil, err
357 }357 }
358 for _, unit := range allUnits {358 for _, instanceId := range instanceIds {
359 machineId, err := unit.AssignedMachineId()359 instanceIdSet.Add(string(instanceId))
360 if state.IsNotAssigned(err) {
361 continue
362 } else if err != nil {
363 return nil, err
364 }
365 machine, err := st.Machine(machineId)
366 if err != nil {
367 return nil, err
368 }
369 instanceId, err := machine.InstanceId()
370 if err == nil {
371 instanceIdSet.Add(string(instanceId))
372 } else if state.IsNotProvisionedError(err) {
373 continue
374 } else {
375 return nil, err
376 }
377 }360 }
378 }361 }
379 instanceIds := make([]instance.Id, instanceIdSet.Size())362 instanceIds := make([]instance.Id, instanceIdSet.Size())
380363
=== added file 'state/distribution.go'
--- state/distribution.go 1970-01-01 00:00:00 +0000
+++ state/distribution.go 2014-03-27 07:16:15 +0000
@@ -0,0 +1,77 @@
1// Copyright 2014 Canonical Ltd.
2// Licensed under the AGPLv3, see LICENCE file for details.
3
4package state
5
6import (
7 "fmt"
8
9 "launchpad.net/juju-core/errors"
10 "launchpad.net/juju-core/instance"
11)
12
13// distribute takes a unit and set of clean, empty instances and asks the
14// InstanceDistributor policy (if any) which ones are suitable for assigning
15// the unit to. If there is no InstanceDistributor, or the distribution group
16// is empty, then all of the candidates will be returned.
17func (u *Unit) distribute(candidates []instance.Id) ([]instance.Id, error) {
18 if u.st.policy == nil {
19 return candidates, nil
20 }
21 cfg, err := u.st.EnvironConfig()
22 if err != nil {
23 return nil, err
24 }
25 distributor, err := u.st.policy.InstanceDistributor(cfg)
26 if errors.IsNotImplementedError(err) {
27 return candidates, nil
28 } else if err != nil {
29 return nil, err
30 }
31 if distributor == nil {
32 return nil, fmt.Errorf("policy returned nil instance distributor without an error")
33 }
34 service, err := u.Service()
35 if err != nil {
36 return nil, err
37 }
38 distributionGroup, err := service.ServiceInstances()
39 if err != nil {
40 return nil, err
41 }
42 if len(distributionGroup) == 0 {
43 return candidates, nil
44 }
45 return distributor.DistributeInstances(candidates, distributionGroup)
46}
47
48// ServiceInstances returns the instance IDs of provisioned
49// machines that are assigned units of this service.
50func (service *Service) ServiceInstances() ([]instance.Id, error) {
51 units, err := service.AllUnits()
52 if err != nil {
53 return nil, err
54 }
55 instanceIds := make([]instance.Id, 0, len(units))
56 for _, unit := range units {
57 machineId, err := unit.AssignedMachineId()
58 if IsNotAssigned(err) {
59 continue
60 } else if err != nil {
61 return nil, err
62 }
63 machine, err := service.st.Machine(machineId)
64 if err != nil {
65 return nil, err
66 }
67 instanceId, err := machine.InstanceId()
68 if err == nil {
69 instanceIds = append(instanceIds, instanceId)
70 } else if IsNotProvisionedError(err) {
71 continue
72 } else {
73 return nil, err
74 }
75 }
76 return instanceIds, nil
77}
078
=== modified file 'state/policy.go'
--- state/policy.go 2014-03-03 01:09:00 +0000
+++ state/policy.go 2014-03-27 07:16:15 +0000
@@ -9,6 +9,7 @@
9 "launchpad.net/juju-core/constraints"9 "launchpad.net/juju-core/constraints"
10 "launchpad.net/juju-core/environs/config"10 "launchpad.net/juju-core/environs/config"
11 "launchpad.net/juju-core/errors"11 "launchpad.net/juju-core/errors"
12 "launchpad.net/juju-core/instance"
12)13)
1314
14// Policy is an interface provided to State that may15// Policy is an interface provided to State that may
@@ -24,10 +25,18 @@
24 // Prechecker takes a *config.Config and returns25 // Prechecker takes a *config.Config and returns
25 // a (possibly nil) Prechecker or an error.26 // a (possibly nil) Prechecker or an error.
26 Prechecker(*config.Config) (Prechecker, error)27 Prechecker(*config.Config) (Prechecker, error)
28<<<<<<< TREE
2729
28 // ConfigValidator takes a string (environ type) and returns30 // ConfigValidator takes a string (environ type) and returns
29 // a (possibly nil) ConfigValidator or an error.31 // a (possibly nil) ConfigValidator or an error.
30 ConfigValidator(string) (ConfigValidator, error)32 ConfigValidator(string) (ConfigValidator, error)
33=======
34
35 // InstanceDistributor takes a *config.Config
36 // and returns a (possibly nil) UnitDistributor
37 // or an error.
38 InstanceDistributor(*config.Config) (InstanceDistributor, error)
39>>>>>>> MERGE-SOURCE
31}40}
3241
33// Prechecker is a policy interface that is provided to State42// Prechecker is a policy interface that is provided to State
@@ -71,6 +80,7 @@
71 }80 }
72 return prechecker.PrecheckInstance(series, cons)81 return prechecker.PrecheckInstance(series, cons)
73}82}
83<<<<<<< TREE
7484
75// validate calls the state's assigned policy, if non-nil, to obtain85// validate calls the state's assigned policy, if non-nil, to obtain
76// a Validator, and calls validate if a non-nil Validator is returned.86// a Validator, and calls validate if a non-nil Validator is returned.
@@ -89,3 +99,23 @@
89 }99 }
90 return configValidator.Validate(cfg, old)100 return configValidator.Validate(cfg, old)
91}101}
102=======
103
104// InstanceDistributor is a policy interface that is provided
105// to State to perform distribution of units across instances
106// for high availability.
107type InstanceDistributor interface {
108 // DistributeInstance takes a set of clean, empty
109 // instances, and a distribution group, and returns
110 // the subset of candidates which the policy will
111 // allow to enter into the distribution group.
112 //
113 // TODO(axw) move this comment
114 // The unit assigner will attempt to assign a unit
115 // to each of the resulting instances until it
116 // succeeds. If no instances can be assigned
117 // (e.g. because of concurrent deployments), then
118 // a new machine will be allocated.
119 DistributeInstances(candidates, distributionGroup []instance.Id) ([]instance.Id, error)
120}
121>>>>>>> MERGE-SOURCE
92122
=== modified file 'state/unit.go'
--- state/unit.go 2014-03-26 12:56:50 +0000
+++ state/unit.go 2014-03-27 07:16:15 +0000
@@ -1225,14 +1225,63 @@
1225 return nil, err1225 return nil, err
1226 }1226 }
12271227
1228 // TODO(rog) Fix so this is more efficient when there are concurrent uses.1228 // Find all of the candidate machines, and associated
1229 // Possible solution: pick the highest and the smallest id of all1229 // instances for those that are provisioned. Instances
1230 // unused machines, and try to assign to the first one >= a random id in the1230 // will be distributed across in preference to
1231 // middle.1231 // unprovisioned machines.
1232 iter := query.Batch(1).Prefetch(0).Iter()1232 var mdocs []*machineDoc
1233 var mdoc machineDoc1233 if err := query.All(&mdocs); err != nil {
1234 for iter.Next(&mdoc) {1234 assignContextf(&err, u, context)
1235 m := newMachine(u.st, &mdoc)1235 return nil, err
1236 }
1237 var unprovisioned []*Machine
1238 var instances []instance.Id
1239 instanceMachines := make(map[instance.Id]*Machine)
1240 for _, mdoc := range mdocs {
1241 m := newMachine(u.st, mdoc)
1242 instance, err := m.InstanceId()
1243 if IsNotProvisionedError(err) {
1244 unprovisioned = append(unprovisioned, m)
1245 } else if err != nil {
1246 assignContextf(&err, u, context)
1247 return nil, err
1248 } else {
1249 instances = append(instances, instance)
1250 instanceMachines[instance] = m
1251 }
1252 }
1253
1254 // Filter the list of instances that are suitable for
1255 // distribution, and then map them back to machines.
1256 //
1257 // TODO(axw) 2014-03-27 #XXXXXX
1258 // Shuffle machines to reduce likelihood of collisions.
1259 // The partition of provisioned/unprovisioned machines
1260 // must be maintained.
1261 if instances, err = u.distribute(instances); err != nil {
1262 assignContextf(&err, u, context)
1263 return nil, err
1264 }
1265 machines := make([]*Machine, len(instances), len(instances)+len(unprovisioned))
1266 for i, instance := range instances {
1267 m, ok := instanceMachines[instance]
1268 if !ok {
1269 err := fmt.Errorf("invalid instance returned: %v", instance)
1270 assignContextf(&err, u, context)
1271 return nil, err
1272 }
1273 machines[i] = m
1274 }
1275 machines = append(machines, unprovisioned...)
1276
1277 // TODO(axw) 2014-03-27 #XXXXXX
1278 // We should not select a machine that is in the process
1279 // of being provisioned. There's no point asserting that
1280 // the machine hasn't been provisioned, as there'll still
1281 // be a period of time during which the machine may be
1282 // provisioned without the fact having yet been recorded
1283 // in state.
1284 for _, m := range machines {
1236 err := u.assignToMachine(m, true)1285 err := u.assignToMachine(m, true)
1237 if err == nil {1286 if err == nil {
1238 return m, nil1287 return m, nil
@@ -1242,10 +1291,6 @@
1242 return nil, err1291 return nil, err
1243 }1292 }
1244 }1293 }
1245 if err := iter.Err(); err != nil {
1246 assignContextf(&err, u, context)
1247 return nil, err
1248 }
1249 return nil, noCleanMachines1294 return nil, noCleanMachines
1250}1295}
12511296

Subscribers

People subscribed via source and target branches

to status/vote changes: