Merge lp:~axwalk/juju-core/instancedistributor-policy into lp:~go-bot/juju-core/trunk

Proposed by Andrew Wilkins
Status: Merged
Merged at revision: 2812
Proposed branch: lp:~axwalk/juju-core/instancedistributor-policy
Merge into: lp:~go-bot/juju-core/trunk
Prerequisite: lp:~axwalk/juju-core/startinstance-principalunit
Diff against target: 293 lines (+181/-32) (has conflicts)
5 files modified
environs/statepolicy.go (+14/-0)
state/apiserver/provisioner/provisioner.go (+3/-20)
state/distribution.go (+77/-0)
state/policy.go (+30/-0)
state/unit.go (+57/-12)
Text conflict in environs/statepolicy.go
Text conflict in state/policy.go
To merge this branch: bzr merge lp:~axwalk/juju-core/instancedistributor-policy
Reviewer Review Type Date Requested Status
Juju Engineering Pending
Review via email: mp+212994@code.launchpad.net

Description of the change

Introduce InstanceDistributor policy

This is a policy that is invoked whenever
we attempt to assign a unit to an empty
machine.

The policy will be invoked with the
associated instances (i.e. provisioned,
empty machines) and the distribution group
instances (i.e. all other instances which
have assigned units of the same type.)

Azure will return nil, whereas ec2 will
want to check which availability zone each
machine is in and distribute for maximum
distance.

https://codereview.appspot.com/81340043/

To post a comment you must log in.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'environs/statepolicy.go'
2--- environs/statepolicy.go 2014-03-25 13:01:16 +0000
3+++ environs/statepolicy.go 2014-03-27 07:16:15 +0000
4@@ -33,7 +33,21 @@
5 }
6 return nil, errors.NewNotImplementedError("Prechecker")
7 }
8+<<<<<<< TREE
9
10 func (environStatePolicy) ConfigValidator(providerType string) (state.ConfigValidator, error) {
11 return Provider(providerType)
12 }
13+=======
14+
15+func (environStatePolicy) InstanceDistributor(cfg *config.Config) (state.InstanceDistributor, error) {
16+ env, err := New(cfg)
17+ if err != nil {
18+ return nil, err
19+ }
20+ if p, ok := env.(state.InstanceDistributor); ok {
21+ return p, nil
22+ }
23+ return nil, errors.NewNotImplementedError("InstanceDistributor")
24+}
25+>>>>>>> MERGE-SOURCE
26
27=== modified file 'state/apiserver/provisioner/provisioner.go'
28--- state/apiserver/provisioner/provisioner.go 2014-03-27 07:16:13 +0000
29+++ state/apiserver/provisioner/provisioner.go 2014-03-27 07:16:15 +0000
30@@ -351,29 +351,12 @@
31 if err != nil {
32 return nil, err
33 }
34- allUnits, err := service.AllUnits()
35+ instanceIds, err := service.ServiceInstances()
36 if err != nil {
37 return nil, err
38 }
39- for _, unit := range allUnits {
40- machineId, err := unit.AssignedMachineId()
41- if state.IsNotAssigned(err) {
42- continue
43- } else if err != nil {
44- return nil, err
45- }
46- machine, err := st.Machine(machineId)
47- if err != nil {
48- return nil, err
49- }
50- instanceId, err := machine.InstanceId()
51- if err == nil {
52- instanceIdSet.Add(string(instanceId))
53- } else if state.IsNotProvisionedError(err) {
54- continue
55- } else {
56- return nil, err
57- }
58+ for _, instanceId := range instanceIds {
59+ instanceIdSet.Add(string(instanceId))
60 }
61 }
62 instanceIds := make([]instance.Id, instanceIdSet.Size())
63
64=== added file 'state/distribution.go'
65--- state/distribution.go 1970-01-01 00:00:00 +0000
66+++ state/distribution.go 2014-03-27 07:16:15 +0000
67@@ -0,0 +1,77 @@
68+// Copyright 2014 Canonical Ltd.
69+// Licensed under the AGPLv3, see LICENCE file for details.
70+
71+package state
72+
73+import (
74+ "fmt"
75+
76+ "launchpad.net/juju-core/errors"
77+ "launchpad.net/juju-core/instance"
78+)
79+
80+// distribute takes a unit and set of clean, empty instances and asks the
81+// InstanceDistributor policy (if any) which ones are suitable for assigning
82+// the unit to. If there is no InstanceDistributor, or the distribution group
83+// is empty, then all of the candidates will be returned.
84+func (u *Unit) distribute(candidates []instance.Id) ([]instance.Id, error) {
85+ if u.st.policy == nil {
86+ return candidates, nil
87+ }
88+ cfg, err := u.st.EnvironConfig()
89+ if err != nil {
90+ return nil, err
91+ }
92+ distributor, err := u.st.policy.InstanceDistributor(cfg)
93+ if errors.IsNotImplementedError(err) {
94+ return candidates, nil
95+ } else if err != nil {
96+ return nil, err
97+ }
98+ if distributor == nil {
99+ return nil, fmt.Errorf("policy returned nil instance distributor without an error")
100+ }
101+ service, err := u.Service()
102+ if err != nil {
103+ return nil, err
104+ }
105+ distributionGroup, err := service.ServiceInstances()
106+ if err != nil {
107+ return nil, err
108+ }
109+ if len(distributionGroup) == 0 {
110+ return candidates, nil
111+ }
112+ return distributor.DistributeInstances(candidates, distributionGroup)
113+}
114+
115+// ServiceInstances returns the instance IDs of provisioned
116+// machines that are assigned units of this service.
117+func (service *Service) ServiceInstances() ([]instance.Id, error) {
118+ units, err := service.AllUnits()
119+ if err != nil {
120+ return nil, err
121+ }
122+ instanceIds := make([]instance.Id, 0, len(units))
123+ for _, unit := range units {
124+ machineId, err := unit.AssignedMachineId()
125+ if IsNotAssigned(err) {
126+ continue
127+ } else if err != nil {
128+ return nil, err
129+ }
130+ machine, err := service.st.Machine(machineId)
131+ if err != nil {
132+ return nil, err
133+ }
134+ instanceId, err := machine.InstanceId()
135+ if err == nil {
136+ instanceIds = append(instanceIds, instanceId)
137+ } else if IsNotProvisionedError(err) {
138+ continue
139+ } else {
140+ return nil, err
141+ }
142+ }
143+ return instanceIds, nil
144+}
145
146=== modified file 'state/policy.go'
147--- state/policy.go 2014-03-03 01:09:00 +0000
148+++ state/policy.go 2014-03-27 07:16:15 +0000
149@@ -9,6 +9,7 @@
150 "launchpad.net/juju-core/constraints"
151 "launchpad.net/juju-core/environs/config"
152 "launchpad.net/juju-core/errors"
153+ "launchpad.net/juju-core/instance"
154 )
155
156 // Policy is an interface provided to State that may
157@@ -24,10 +25,18 @@
158 // Prechecker takes a *config.Config and returns
159 // a (possibly nil) Prechecker or an error.
160 Prechecker(*config.Config) (Prechecker, error)
161+<<<<<<< TREE
162
163 // ConfigValidator takes a string (environ type) and returns
164 // a (possibly nil) ConfigValidator or an error.
165 ConfigValidator(string) (ConfigValidator, error)
166+=======
167+
168+ // InstanceDistributor takes a *config.Config
169+ // and returns a (possibly nil) UnitDistributor
170+ // or an error.
171+ InstanceDistributor(*config.Config) (InstanceDistributor, error)
172+>>>>>>> MERGE-SOURCE
173 }
174
175 // Prechecker is a policy interface that is provided to State
176@@ -71,6 +80,7 @@
177 }
178 return prechecker.PrecheckInstance(series, cons)
179 }
180+<<<<<<< TREE
181
182 // validate calls the state's assigned policy, if non-nil, to obtain
183 // a Validator, and calls validate if a non-nil Validator is returned.
184@@ -89,3 +99,23 @@
185 }
186 return configValidator.Validate(cfg, old)
187 }
188+=======
189+
190+// InstanceDistributor is a policy interface that is provided
191+// to State to perform distribution of units across instances
192+// for high availability.
193+type InstanceDistributor interface {
194+ // DistributeInstance takes a set of clean, empty
195+ // instances, and a distribution group, and returns
196+ // the subset of candidates which the policy will
197+ // allow to enter into the distribution group.
198+ //
199+ // TODO(axw) move this comment
200+ // The unit assigner will attempt to assign a unit
201+ // to each of the resulting instances until it
202+ // succeeds. If no instances can be assigned
203+ // (e.g. because of concurrent deployments), then
204+ // a new machine will be allocated.
205+ DistributeInstances(candidates, distributionGroup []instance.Id) ([]instance.Id, error)
206+}
207+>>>>>>> MERGE-SOURCE
208
209=== modified file 'state/unit.go'
210--- state/unit.go 2014-03-26 12:56:50 +0000
211+++ state/unit.go 2014-03-27 07:16:15 +0000
212@@ -1225,14 +1225,63 @@
213 return nil, err
214 }
215
216- // TODO(rog) Fix so this is more efficient when there are concurrent uses.
217- // Possible solution: pick the highest and the smallest id of all
218- // unused machines, and try to assign to the first one >= a random id in the
219- // middle.
220- iter := query.Batch(1).Prefetch(0).Iter()
221- var mdoc machineDoc
222- for iter.Next(&mdoc) {
223- m := newMachine(u.st, &mdoc)
224+ // Find all of the candidate machines, and associated
225+ // instances for those that are provisioned. Instances
226+ // will be distributed across in preference to
227+ // unprovisioned machines.
228+ var mdocs []*machineDoc
229+ if err := query.All(&mdocs); err != nil {
230+ assignContextf(&err, u, context)
231+ return nil, err
232+ }
233+ var unprovisioned []*Machine
234+ var instances []instance.Id
235+ instanceMachines := make(map[instance.Id]*Machine)
236+ for _, mdoc := range mdocs {
237+ m := newMachine(u.st, mdoc)
238+ instance, err := m.InstanceId()
239+ if IsNotProvisionedError(err) {
240+ unprovisioned = append(unprovisioned, m)
241+ } else if err != nil {
242+ assignContextf(&err, u, context)
243+ return nil, err
244+ } else {
245+ instances = append(instances, instance)
246+ instanceMachines[instance] = m
247+ }
248+ }
249+
250+ // Filter the list of instances that are suitable for
251+ // distribution, and then map them back to machines.
252+ //
253+ // TODO(axw) 2014-03-27 #XXXXXX
254+ // Shuffle machines to reduce likelihood of collisions.
255+ // The partition of provisioned/unprovisioned machines
256+ // must be maintained.
257+ if instances, err = u.distribute(instances); err != nil {
258+ assignContextf(&err, u, context)
259+ return nil, err
260+ }
261+ machines := make([]*Machine, len(instances), len(instances)+len(unprovisioned))
262+ for i, instance := range instances {
263+ m, ok := instanceMachines[instance]
264+ if !ok {
265+ err := fmt.Errorf("invalid instance returned: %v", instance)
266+ assignContextf(&err, u, context)
267+ return nil, err
268+ }
269+ machines[i] = m
270+ }
271+ machines = append(machines, unprovisioned...)
272+
273+ // TODO(axw) 2014-03-27 #XXXXXX
274+ // We should not select a machine that is in the process
275+ // of being provisioned. There's no point asserting that
276+ // the machine hasn't been provisioned, as there'll still
277+ // be a period of time during which the machine may be
278+ // provisioned without the fact having yet been recorded
279+ // in state.
280+ for _, m := range machines {
281 err := u.assignToMachine(m, true)
282 if err == nil {
283 return m, nil
284@@ -1242,10 +1291,6 @@
285 return nil, err
286 }
287 }
288- if err := iter.Err(); err != nil {
289- assignContextf(&err, u, context)
290- return nil, err
291- }
292 return nil, noCleanMachines
293 }
294

Subscribers

People subscribed via source and target branches

to status/vote changes: