Merge lp:~allenap/juju-core/break-out-juju-store into lp:~juju/juju-core/trunk

Proposed by Gavin Panella
Status: Merged
Approved by: William Reade
Approved revision: no longer in the source branch.
Merged at revision: 886
Proposed branch: lp:~allenap/juju-core/break-out-juju-store
Merge into: lp:~juju/juju-core/trunk
Diff against target: 2667 lines (+0/-2600)
13 files modified
cmd/charmd/config.yaml (+0/-2)
cmd/charmd/main.go (+0/-74)
cmd/charmload/config.yaml (+0/-1)
cmd/charmload/main.go (+0/-75)
store/branch.go (+0/-152)
store/branch_test.go (+0/-238)
store/lpad.go (+0/-113)
store/lpad_test.go (+0/-68)
store/mgo_test.go (+0/-95)
store/server.go (+0/-191)
store/server_test.go (+0/-209)
store/store.go (+0/-774)
store/store_test.go (+0/-608)
To merge this branch: bzr merge lp:~allenap/juju-core/break-out-juju-store
Reviewer Review Type Date Requested Status
The Go Language Gophers Pending
Review via email: mp+142564@code.launchpad.net

Description of the change

Break out store and cmd/charm* into lp:juju-store

Red Squad are going to be working on the charm store, and it was
suggested that an early task would be to split the charm store out
into a separate project. That work has already been done - see
lp:juju-store - and this is the clean-up job.

mgz helped me a lot in doing both these tasks.

Fwiw, juju-store has not been advertised, so feel free to suggest a
different name.

https://codereview.appspot.com/7058063/

To post a comment you must log in.
Revision history for this message
Gavin Panella (allenap) wrote :

Reviewers: mp+142564_code.launchpad.net,

Message:
Please take a look.

Description:
Break out store and cmd/charm* into lp:juju-store

Red Squad are going to be working on the charm store, and it was
suggested that an early task would be to split the charm store out
into a separate project. That work has already been done - see
lp:juju-store - and this is the clean-up job.

mgz helped me a lot in doing both these tasks.

Fwiw, juju-store has not been advertised, so feel free to suggest a
different name.

https://code.launchpad.net/~allenap/juju-core/break-out-juju-store/+merge/142564

(do not edit description out of merge proposal)

Please review this at https://codereview.appspot.com/7058063/

Affected files:
   A [revision details]
   D cmd/charmd/config.yaml
   D cmd/charmd/main.go
   D cmd/charmload/config.yaml
   D cmd/charmload/main.go
   D store/branch.go
   D store/branch_test.go
   D store/lpad.go
   D store/lpad_test.go
   D store/mgo_test.go
   D store/server.go
   D store/server_test.go
   D store/store.go
   D store/store_test.go

Revision history for this message
William Reade (fwereade) wrote :

On 2013/01/09 17:59:58, allenap wrote:
> Please take a look.

LGTM; I'm +1 on juju-store myself.

https://codereview.appspot.com/7058063/

Revision history for this message
William Reade (fwereade) wrote :

Note that https://code.launchpad.net/~dave-cheney/juju-core/068-CONTRIBUTING/+merge/143057 merges a bug fix; please ensure it makes it across to lp:juju-store.

Revision history for this message
Gavin Panella (allenap) wrote :

Reopening. This consensus from Austin is that the store should be broken out into a separate project.

Revision history for this message
William Reade (fwereade) wrote :

As long as the latest version of store is taken, and any bugs are moved, I'm +1 on just going ahead and doing this.

Revision history for this message
Gavin Panella (allenap) wrote :

William, would you be able to land this please? I don't have the necessary fu. However, before you do, do you know if this will break any service deployment scripts for store.juju.ubuntu.com (if I remember the hostname correctly)?

Revision history for this message
Gavin Panella (allenap) wrote :

I forgot to mention: lp:juju-store has the latest store code from lp:juju-core, and I'm happy to find and move all the bugs over when this lands.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== removed directory 'cmd/charmd'
=== removed file 'cmd/charmd/config.yaml'
--- cmd/charmd/config.yaml 2012-04-02 15:05:40 +0000
+++ cmd/charmd/config.yaml 1970-01-01 00:00:00 +0000
@@ -1,2 +0,0 @@
1mongo-url: localhost:60017
2api-addr: localhost:8080
30
=== removed file 'cmd/charmd/main.go'
--- cmd/charmd/main.go 2012-06-21 20:40:39 +0000
+++ cmd/charmd/main.go 1970-01-01 00:00:00 +0000
@@ -1,74 +0,0 @@
1package main
2
3import (
4 "fmt"
5 "io/ioutil"
6 "launchpad.net/goyaml"
7 "launchpad.net/juju-core/log"
8 "launchpad.net/juju-core/store"
9 stdlog "log"
10 "net/http"
11 "os"
12 "path/filepath"
13)
14
15func main() {
16 log.Target = stdlog.New(os.Stdout, "", stdlog.LstdFlags)
17 err := serve()
18 if err != nil {
19 fmt.Fprintf(os.Stderr, "%v\n", err)
20 os.Exit(1)
21 }
22}
23
24type config struct {
25 MongoURL string `yaml:"mongo-url"`
26 APIAddr string `yaml:"api-addr"`
27}
28
29func readConfig(path string, conf interface{}) error {
30 f, err := os.Open(path)
31 if err != nil {
32 return fmt.Errorf("opening config file: %v", err)
33 }
34 data, err := ioutil.ReadAll(f)
35 f.Close()
36 if err != nil {
37 return fmt.Errorf("reading config file: %v", err)
38 }
39 err = goyaml.Unmarshal(data, conf)
40 if err != nil {
41 return fmt.Errorf("processing config file: %v", err)
42 }
43 return nil
44}
45
46func serve() error {
47 var confPath string
48 if len(os.Args) == 2 {
49 if _, err := os.Stat(os.Args[1]); err == nil {
50 confPath = os.Args[1]
51 }
52 }
53 if confPath == "" {
54 return fmt.Errorf("usage: %s <config path>", filepath.Base(os.Args[0]))
55 }
56 var conf config
57 err := readConfig(confPath, &conf)
58 if err != nil {
59 return err
60 }
61 if conf.MongoURL == "" || conf.APIAddr == "" {
62 return fmt.Errorf("missing mongo-url or api-addr in config file")
63 }
64 s, err := store.Open(conf.MongoURL)
65 if err != nil {
66 return err
67 }
68 defer s.Close()
69 server, err := store.NewServer(s)
70 if err != nil {
71 return err
72 }
73 return http.ListenAndServe(conf.APIAddr, server)
74}
750
=== removed directory 'cmd/charmload'
=== removed file 'cmd/charmload/config.yaml'
--- cmd/charmload/config.yaml 2012-04-02 15:05:40 +0000
+++ cmd/charmload/config.yaml 1970-01-01 00:00:00 +0000
@@ -1,1 +0,0 @@
1mongo-url: localhost:60017
20
=== removed file 'cmd/charmload/main.go'
--- cmd/charmload/main.go 2012-06-21 20:40:39 +0000
+++ cmd/charmload/main.go 1970-01-01 00:00:00 +0000
@@ -1,75 +0,0 @@
1package main
2
3import (
4 "fmt"
5 "io/ioutil"
6 "launchpad.net/goyaml"
7 "launchpad.net/juju-core/log"
8 "launchpad.net/juju-core/store"
9 "launchpad.net/lpad"
10 stdlog "log"
11 "os"
12 "path/filepath"
13)
14
15func main() {
16 log.Target = stdlog.New(os.Stdout, "", stdlog.LstdFlags)
17 err := load()
18 if err != nil {
19 fmt.Fprintf(os.Stderr, "%v\n", err)
20 os.Exit(1)
21 }
22}
23
24type config struct {
25 MongoURL string `yaml:"mongo-url"`
26}
27
28func readConfig(path string, conf interface{}) error {
29 f, err := os.Open(path)
30 if err != nil {
31 return fmt.Errorf("opening config file: %v", err)
32 }
33 data, err := ioutil.ReadAll(f)
34 f.Close()
35 if err != nil {
36 return fmt.Errorf("reading config file: %v", err)
37 }
38 err = goyaml.Unmarshal(data, conf)
39 if err != nil {
40 return fmt.Errorf("processing config file: %v", err)
41 }
42 return nil
43}
44
45func load() error {
46 var confPath string
47 if len(os.Args) == 2 {
48 if _, err := os.Stat(os.Args[1]); err == nil {
49 confPath = os.Args[1]
50 }
51 }
52 if confPath == "" {
53 return fmt.Errorf("usage: %s <config path>", filepath.Base(os.Args[0]))
54 }
55 var conf config
56 err := readConfig(confPath, &conf)
57 if err != nil {
58 return err
59 }
60 if conf.MongoURL == "" {
61 return fmt.Errorf("missing mongo-url in config file")
62 }
63 s, err := store.Open(conf.MongoURL)
64 if err != nil {
65 return err
66 }
67 defer s.Close()
68 err = store.PublishCharmsDistro(s, lpad.Production)
69 if _, ok := err.(store.PublishBranchErrors); ok {
70 // Ignore branch errors since they're commonplace here.
71 // They're logged, though.
72 return nil
73 }
74 return err
75}
760
=== removed directory 'store'
=== removed file 'store/branch.go'
--- store/branch.go 2012-07-24 09:36:51 +0000
+++ store/branch.go 1970-01-01 00:00:00 +0000
@@ -1,152 +0,0 @@
1package store
2
3import (
4 "bytes"
5 "fmt"
6 "io/ioutil"
7 "launchpad.net/juju-core/charm"
8 "os"
9 "os/exec"
10 "path/filepath"
11 "strings"
12)
13
14// PublishBazaarBranch checks out the Bazaar branch from burl and
15// publishes its latest revision at urls in the given store.
16// The digest parameter must be the most recent known Bazaar
17// revision id for the branch tip. If publishing this specific digest
18// for these URLs has been attempted already, the publishing
19// procedure may abort early. The published digest is the Bazaar
20// revision id of the checked out branch's tip, though, which may
21// differ from the digest parameter.
22func PublishBazaarBranch(store *Store, urls []*charm.URL, burl string, digest string) error {
23
24 // Prevent other publishers from updating these specific URLs
25 // concurrently.
26 lock, err := store.LockUpdates(urls)
27 if err != nil {
28 return err
29 }
30 defer lock.Unlock()
31
32 var branchDir string
33NewTip:
34 // Prepare the charm publisher. This will compute the revision
35 // to be assigned to the charm, and it will also fail if the
36 // operation is unnecessary because charms are up-to-date.
37 pub, err := store.CharmPublisher(urls, digest)
38 if err != nil {
39 return err
40 }
41
42 // Figure if publishing this charm was already attempted before and
43 // failed. We won't try again endlessly if so. In the future we may
44 // retry automatically in certain circumstances.
45 event, err := store.CharmEvent(urls[0], digest)
46 if err == nil && event.Kind != EventPublished {
47 return fmt.Errorf("charm publishing previously failed: %s", strings.Join(event.Errors, "; "))
48 } else if err != nil && err != ErrNotFound {
49 return err
50 }
51
52 if branchDir == "" {
53 // Retrieve the branch with a lightweight checkout, so that it
54 // builds a working tree as cheaply as possible. History
55 // doesn't matter here.
56 tempDir, err := ioutil.TempDir("", "publish-branch-")
57 if err != nil {
58 return err
59 }
60 defer os.RemoveAll(tempDir)
61 branchDir = filepath.Join(tempDir, "branch")
62 output, err := exec.Command("bzr", "checkout", "--lightweight", burl, branchDir).CombinedOutput()
63 if err != nil {
64 return outputErr(output, err)
65 }
66
67 // Pick actual digest from tip. Publishing the real tip
68 // revision rather than the revision for the digest provided is
69 // strictly necessary to prevent a race condition. If the
70 // provided digest was published instead, there's a chance
71 // another publisher concurrently running could have found a
72 // newer revision and published that first, and the digest
73 // parameter provided is in fact an old version that would
74 // overwrite the new version.
75 tipDigest, err := bzrRevisionId(branchDir)
76 if err != nil {
77 return err
78 }
79 if tipDigest != digest {
80 digest = tipDigest
81 goto NewTip
82 }
83 }
84
85 ch, err := charm.ReadDir(branchDir)
86 if err == nil {
87 // Hand over the charm to the store for bundling and
88 // streaming its content into the database.
89 err = pub.Publish(ch)
90 if err == ErrUpdateConflict {
91 // A conflict may happen in edge cases if the whole
92 // locking mechanism fails due to an expiration event,
93 // and then the expired concurrent publisher revives
94 // for whatever reason and attempts to finish
95 // publishing. The state of the system is still
96 // consistent in that case, and the error isn't logged
97 // since the revision was properly published before.
98 return err
99 }
100 }
101
102 // Publishing is done. Log failure or error.
103 event = &CharmEvent{
104 URLs: urls,
105 Digest: digest,
106 }
107 if err == nil {
108 event.Kind = EventPublished
109 event.Revision = pub.Revision()
110 } else {
111 event.Kind = EventPublishError
112 event.Errors = []string{err.Error()}
113 }
114 if logerr := store.LogCharmEvent(event); logerr != nil {
115 if err == nil {
116 err = logerr
117 } else {
118 err = fmt.Errorf("%v; %v", err, logerr)
119 }
120 }
121 return err
122}
123
124// bzrRevisionId returns the Bazaar revision id for the branch in branchDir.
125func bzrRevisionId(branchDir string) (string, error) {
126 cmd := exec.Command("bzr", "revision-info")
127 cmd.Dir = branchDir
128 stderr := &bytes.Buffer{}
129 cmd.Stderr = stderr
130 output, err := cmd.Output()
131 if err != nil {
132 output = append(output, '\n')
133 output = append(output, stderr.Bytes()...)
134 return "", outputErr(output, err)
135 }
136 pair := bytes.Fields(output)
137 if len(pair) != 2 {
138 output = append(output, '\n')
139 output = append(output, stderr.Bytes()...)
140 return "", fmt.Errorf(`invalid output from "bzr revision-info": %s`, output)
141 }
142 return string(pair[1]), nil
143}
144
145// outputErr returns an error that assembles some command's output and its
146// error, if both output and err are set, and returns only err if output is nil.
147func outputErr(output []byte, err error) error {
148 if len(output) > 0 {
149 return fmt.Errorf("%v\n%s", err, output)
150 }
151 return err
152}
1530
=== removed file 'store/branch_test.go'
--- store/branch_test.go 2012-10-09 01:19:01 +0000
+++ store/branch_test.go 1970-01-01 00:00:00 +0000
@@ -1,238 +0,0 @@
1package store_test
2
3import (
4 "bytes"
5 "fmt"
6 "io/ioutil"
7 . "launchpad.net/gocheck"
8 "launchpad.net/juju-core/charm"
9 "launchpad.net/juju-core/store"
10 "launchpad.net/juju-core/testing"
11 "os"
12 "os/exec"
13 "path/filepath"
14 "strings"
15 "time"
16)
17
18func (s *StoreSuite) dummyBranch(c *C, suffix string) bzrDir {
19 tmpDir := c.MkDir()
20 if suffix != "" {
21 tmpDir = filepath.Join(tmpDir, suffix)
22 err := os.MkdirAll(tmpDir, 0755)
23 c.Assert(err, IsNil)
24 }
25 branch := bzrDir(tmpDir)
26 branch.init()
27
28 copyCharmDir(branch.path(), testing.Charms.Dir("series", "dummy"))
29 branch.add()
30 branch.commit("Imported charm.")
31 return branch
32}
33
34var urls = []*charm.URL{
35 charm.MustParseURL("cs:~joe/oneiric/dummy"),
36 charm.MustParseURL("cs:oneiric/dummy"),
37}
38
39type fakePlugin struct {
40 oldEnv string
41}
42
43func (p *fakePlugin) install(dir string, content string) {
44 p.oldEnv = os.Getenv("BZR_PLUGINS_AT")
45 err := ioutil.WriteFile(filepath.Join(dir, "__init__.py"), []byte(content), 0644)
46 if err != nil {
47 panic(err)
48 }
49 os.Setenv("BZR_PLUGINS_AT", "fakePlugin@"+dir)
50}
51
52func (p *fakePlugin) uninstall() {
53 os.Setenv("BZR_PLUGINS_AT", p.oldEnv)
54}
55
56func (s *StoreSuite) TestPublish(c *C) {
57 branch := s.dummyBranch(c, "")
58
59 // Ensure that the streams are parsed separately by inserting
60 // garbage on stderr. The wanted information is still there.
61 plugin := fakePlugin{}
62 plugin.install(c.MkDir(), `import sys; sys.stderr.write("STDERR STUFF FROM TEST\n")`)
63 defer plugin.uninstall()
64
65 err := store.PublishBazaarBranch(s.store, urls, branch.path(), "wrong-rev")
66 c.Assert(err, IsNil)
67
68 for _, url := range urls {
69 info, rc, err := s.store.OpenCharm(url)
70 c.Assert(err, IsNil)
71 defer rc.Close()
72 c.Assert(info.Revision(), Equals, 0)
73 c.Assert(info.Meta().Name, Equals, "dummy")
74
75 data, err := ioutil.ReadAll(rc)
76 c.Assert(err, IsNil)
77
78 bundle, err := charm.ReadBundleBytes(data)
79 c.Assert(err, IsNil)
80 c.Assert(bundle.Revision(), Equals, 0)
81 c.Assert(bundle.Meta().Name, Equals, "dummy")
82 }
83
84 // Attempt to publish the same content again while providing the wrong
85 // tip revision. It must pick the real revision from the branch and
86 // note this was previously published.
87 err = store.PublishBazaarBranch(s.store, urls, branch.path(), "wrong-rev")
88 c.Assert(err, Equals, store.ErrRedundantUpdate)
89
90 // Bump the content revision and lie again about the known tip revision.
91 // This time, though, pretend it's the same as the real branch revision
92 // previously published. It must error and not publish the new revision
93 // because it will use the revision provided as a parameter to check if
94 // publishing was attempted before. This is the mechanism that enables
95 // stopping fast without having to download every single branch. Real
96 // revision is picked in the next scan.
97 digest1 := branch.digest()
98 branch.change()
99 err = store.PublishBazaarBranch(s.store, urls, branch.path(), digest1)
100 c.Assert(err, Equals, store.ErrRedundantUpdate)
101
102 // Now allow it to publish the new content by providing an unseen revision.
103 err = store.PublishBazaarBranch(s.store, urls, branch.path(), "wrong-rev")
104 c.Assert(err, IsNil)
105 digest2 := branch.digest()
106
107 info, err := s.store.CharmInfo(urls[0])
108 c.Assert(err, IsNil)
109 c.Assert(info.Revision(), Equals, 1)
110 c.Assert(info.Meta().Name, Equals, "dummy")
111
112 // There are two events published, for each of the successful attempts.
113 // The failures are ignored given that they are artifacts of the
114 // publishing mechanism rather than actual problems.
115 _, err = s.store.CharmEvent(urls[0], "wrong-rev")
116 c.Assert(err, Equals, store.ErrNotFound)
117 for i, digest := range []string{digest1, digest2} {
118 event, err := s.store.CharmEvent(urls[0], digest)
119 c.Assert(err, IsNil)
120 c.Assert(event.Kind, Equals, store.EventPublished)
121 c.Assert(event.Revision, Equals, i)
122 c.Assert(event.Errors, IsNil)
123 c.Assert(event.Warnings, IsNil)
124 }
125}
126
127func (s *StoreSuite) TestPublishErrorFromBzr(c *C) {
128 branch := s.dummyBranch(c, "")
129
130 // In TestPublish we ensure that the streams are parsed
131 // separately by inserting garbage on stderr. Now make
132 // sure that stderr isn't simply trashed, as we want to
133 // know about what a real error tells us.
134 plugin := fakePlugin{}
135 plugin.install(c.MkDir(), `import sys; sys.stderr.write("STDERR STUFF FROM TEST\n"); sys.exit(1)`)
136 defer plugin.uninstall()
137
138 err := store.PublishBazaarBranch(s.store, urls, branch.path(), "wrong-rev")
139 c.Assert(err, ErrorMatches, "(?s).*STDERR STUFF.*")
140}
141
142func (s *StoreSuite) TestPublishErrorInCharm(c *C) {
143 branch := s.dummyBranch(c, "")
144
145 // Corrupt the charm.
146 branch.remove("metadata.yaml")
147 branch.commit("Removed metadata.yaml.")
148
149 // Attempt to publish the erroneous content.
150 err := store.PublishBazaarBranch(s.store, urls, branch.path(), "wrong-rev")
151 c.Assert(err, ErrorMatches, ".*/metadata.yaml: no such file or directory")
152
153 // The event should be logged as well, since this was an error in the charm
154 // that won't go away and must be communicated to the author.
155 event, err := s.store.CharmEvent(urls[0], branch.digest())
156 c.Assert(err, IsNil)
157 c.Assert(event.Kind, Equals, store.EventPublishError)
158 c.Assert(event.Revision, Equals, 0)
159 c.Assert(event.Errors, NotNil)
160 c.Assert(event.Errors[0], Matches, ".*/metadata.yaml: no such file or directory")
161 c.Assert(event.Warnings, IsNil)
162}
163
164type bzrDir string
165
166func (dir bzrDir) path(args ...string) string {
167 return filepath.Join(append([]string{string(dir)}, args...)...)
168}
169
170func (dir bzrDir) run(args ...string) []byte {
171 cmd := exec.Command("bzr", args...)
172 oldemail := os.Getenv("EMAIL")
173 defer os.Setenv("EMAIL", oldemail)
174 // bzr will complain if bzr whoami has not been run previously,
175 // avoid this by passing $EMAIL into the environment.
176 os.Setenv("EMAIL", "nobody@example.com")
177 cmd.Dir = string(dir)
178 output, err := cmd.Output()
179 if err != nil {
180 panic(fmt.Sprintf("command failed: bzr %s\n%s", strings.Join(args, " "), output))
181 }
182 return output
183}
184
185func (dir bzrDir) init() {
186 dir.run("init")
187}
188
189func (dir bzrDir) add(paths ...string) {
190 dir.run(append([]string{"add"}, paths...)...)
191}
192
193func (dir bzrDir) remove(paths ...string) {
194 dir.run(append([]string{"rm"}, paths...)...)
195}
196
197func (dir bzrDir) commit(msg string) {
198 dir.run("commit", "-m", msg)
199}
200
201func (dir bzrDir) write(path string, data string) {
202 err := ioutil.WriteFile(dir.path(path), []byte(data), 0644)
203 if err != nil {
204 panic(err)
205 }
206}
207
208func (dir bzrDir) change() {
209 t := time.Now().String()
210 dir.write("timestamp", t)
211 dir.add("timestamp")
212 dir.commit("Revision bumped at " + t)
213}
214
215func (dir bzrDir) digest() string {
216 output := dir.run("revision-info")
217 f := bytes.Fields(output)
218 if len(f) != 2 {
219 panic("revision-info returned bad output: " + string(output))
220 }
221 return string(f[1])
222}
223
224func copyCharmDir(dst string, dir *charm.Dir) {
225 var b bytes.Buffer
226 err := dir.BundleTo(&b)
227 if err != nil {
228 panic(err)
229 }
230 bundle, err := charm.ReadBundleBytes(b.Bytes())
231 if err != nil {
232 panic(err)
233 }
234 err = bundle.ExpandTo(dst)
235 if err != nil {
236 panic(err)
237 }
238}
2390
=== removed file 'store/lpad.go'
--- store/lpad.go 2012-06-21 20:40:39 +0000
+++ store/lpad.go 1970-01-01 00:00:00 +0000
@@ -1,113 +0,0 @@
1package store
2
3import (
4 "fmt"
5 "launchpad.net/juju-core/charm"
6 "launchpad.net/juju-core/log"
7 "launchpad.net/lpad"
8 "strings"
9 "time"
10)
11
12type PublishBranchError struct {
13 URL string
14 Err error
15}
16
17type PublishBranchErrors []PublishBranchError
18
19func (errs PublishBranchErrors) Error() string {
20 return fmt.Sprintf("%d branch(es) failed to be published", len(errs))
21}
22
23// PublishCharmsDistro publishes all branch tips found in
24// the /charms distribution in Launchpad onto store under
25// the "cs:" scheme.
26// apiBase specifies the Launchpad base API URL, such
27// as lpad.Production or lpad.Staging.
28// Errors found while processing one or more branches are
29// all returned as a PublishBranchErrors value.
30func PublishCharmsDistro(store *Store, apiBase lpad.APIBase) error {
31 oauth := &lpad.OAuth{Anonymous: true, Consumer: "juju"}
32 root, err := lpad.Login(apiBase, oauth)
33 if err != nil {
34 return err
35 }
36 distro, err := root.Distro("charms")
37 if err != nil {
38 return err
39 }
40 tips, err := distro.BranchTips(time.Time{})
41 if err != nil {
42 return err
43 }
44
45 var errs PublishBranchErrors
46 for _, tip := range tips {
47 if !strings.HasSuffix(tip.UniqueName, "/trunk") {
48 continue
49 }
50 burl, curl, err := uniqueNameURLs(tip.UniqueName)
51 if err != nil {
52 errs = append(errs, PublishBranchError{tip.UniqueName, err})
53 log.Printf("error: %v\n", err)
54 continue
55 }
56 log.Printf("----- %s\n", burl)
57 if tip.Revision == "" {
58 errs = append(errs, PublishBranchError{burl, fmt.Errorf("branch has no revisions")})
59 log.Printf("error: branch has no revisions\n")
60 continue
61 }
62 // Charm is published in the personal URL and in any explicitly
63 // assigned official series.
64 urls := []*charm.URL{curl}
65 schema, name := curl.Schema, curl.Name
66 for _, series := range tip.OfficialSeries {
67 curl = &charm.URL{Schema: schema, Name: name, Series: series, Revision: -1}
68 curl.Series = series
69 curl.User = ""
70 urls = append(urls, curl)
71 }
72
73 err = PublishBazaarBranch(store, urls, burl, tip.Revision)
74 if err == ErrRedundantUpdate {
75 continue
76 }
77 if err != nil {
78 errs = append(errs, PublishBranchError{burl, err})
79 log.Printf("error: %v\n", err)
80 }
81 }
82 if errs != nil {
83 return errs
84 }
85 return nil
86}
87
88// uniqueNameURLs returns the branch URL and the charm URL for the
89// provided Launchpad branch unique name. The unique name must be
90// in the form:
91//
92// ~<user>/charms/<series>/<charm name>/trunk
93//
94// For testing purposes, if name has a prefix preceding a string in
95// this format, the prefix is stripped out for computing the charm
96// URL, and the unique name is returned unchanged as the branch URL.
97func uniqueNameURLs(name string) (burl string, curl *charm.URL, err error) {
98 u := strings.Split(name, "/")
99 if len(u) > 5 {
100 u = u[len(u)-5:]
101 burl = name
102 } else {
103 burl = "lp:" + name
104 }
105 if len(u) < 5 || u[1] != "charms" || u[4] != "trunk" || len(u[0]) == 0 || u[0][0] != '~' {
106 return "", nil, fmt.Errorf("unwanted branch name: %s", name)
107 }
108 curl, err = charm.ParseURL(fmt.Sprintf("cs:%s/%s/%s", u[0], u[2], u[3]))
109 if err != nil {
110 return "", nil, err
111 }
112 return burl, curl, nil
113}
1140
=== removed file 'store/lpad_test.go'
--- store/lpad_test.go 2012-08-18 22:48:02 +0000
+++ store/lpad_test.go 1970-01-01 00:00:00 +0000
@@ -1,68 +0,0 @@
1package store_test
2
3import (
4 "fmt"
5 . "launchpad.net/gocheck"
6 "launchpad.net/juju-core/charm"
7 "launchpad.net/juju-core/store"
8 "launchpad.net/juju-core/testing"
9 "launchpad.net/lpad"
10)
11
12var jsonType = map[string]string{
13 "Content-Type": "application/json",
14}
15
16func (s *StoreSuite) TestPublishCharmDistro(c *C) {
17 branch := s.dummyBranch(c, "~joe/charms/oneiric/dummy/trunk")
18
19 // The Distro call will look for bare /charms, first.
20 testing.Server.Response(200, jsonType, []byte("{}"))
21
22 // And then it picks up the tips.
23 data := fmt.Sprintf(`[`+
24 `["file://%s", "rev1", ["oneiric", "precise"]],`+
25 `["file://%s", "%s", []],`+
26 `["file:///non-existent/~jeff/charms/precise/bad/trunk", "rev2", []],`+
27 `["file:///non-existent/~jeff/charms/precise/bad/skip-me", "rev3", []]`+
28 `]`,
29 branch.path(), branch.path(), branch.digest())
30 testing.Server.Response(200, jsonType, []byte(data))
31
32 apiBase := lpad.APIBase(testing.Server.URL)
33 err := store.PublishCharmsDistro(s.store, apiBase)
34
35 // Should have a single failure from the trunk branch that doesn't
36 // exist. The redundant update with the known digest should be
37 // ignored, and skip-me isn't a supported branch name so it's
38 // ignored as well.
39 c.Assert(err, ErrorMatches, `1 branch\(es\) failed to be published`)
40 berr := err.(store.PublishBranchErrors)[0]
41 c.Assert(berr.URL, Equals, "file:///non-existent/~jeff/charms/precise/bad/trunk")
42 c.Assert(berr.Err, ErrorMatches, "(?s).*bzr: ERROR: Not a branch.*")
43
44 for _, url := range []string{"cs:oneiric/dummy", "cs:precise/dummy-0", "cs:~joe/oneiric/dummy-0"} {
45 dummy, err := s.store.CharmInfo(charm.MustParseURL(url))
46 c.Assert(err, IsNil)
47 c.Assert(dummy.Meta().Name, Equals, "dummy")
48 }
49
50 // The known digest should have been ignored, so revision is still at 0.
51 _, err = s.store.CharmInfo(charm.MustParseURL("cs:~joe/oneiric/dummy-1"))
52 c.Assert(err, Equals, store.ErrNotFound)
53
54 // bare /charms lookup
55 req := testing.Server.WaitRequest()
56 c.Assert(req.Method, Equals, "GET")
57 c.Assert(req.URL.Path, Equals, "/charms")
58
59 // tips request
60 req = testing.Server.WaitRequest()
61 c.Assert(req.Method, Equals, "GET")
62 c.Assert(req.URL.Path, Equals, "/charms")
63 c.Assert(req.Form["ws.op"], DeepEquals, []string{"getBranchTips"})
64 c.Assert(req.Form["since"], IsNil)
65
66 // Request must be signed by juju.
67 c.Assert(req.Header.Get("Authorization"), Matches, `.*oauth_consumer_key="juju".*`)
68}
690
=== removed file 'store/mgo_test.go'
--- store/mgo_test.go 2012-10-30 11:45:32 +0000
+++ store/mgo_test.go 1970-01-01 00:00:00 +0000
@@ -1,95 +0,0 @@
1package store_test
2
3import (
4 "bytes"
5 "labix.org/v2/mgo"
6 . "launchpad.net/gocheck"
7 "os/exec"
8 "time"
9)
10
11// ----------------------------------------------------------------------------
12// The mgo test suite
13
14type MgoSuite struct {
15 Addr string
16 Session *mgo.Session
17 output bytes.Buffer
18 server *exec.Cmd
19}
20
21func (s *MgoSuite) SetUpSuite(c *C) {
22 mgo.SetDebug(true)
23 mgo.SetStats(true)
24 dbdir := c.MkDir()
25 args := []string{
26 "--dbpath", dbdir,
27 "--bind_ip", "127.0.0.1",
28 "--port", "50017",
29 "--nssize", "1",
30 "--noprealloc",
31 "--smallfiles",
32 "--nojournal",
33 }
34 s.server = exec.Command("mongod", args...)
35 s.server.Stdout = &s.output
36 s.server.Stderr = &s.output
37 err := s.server.Start()
38 c.Assert(err, IsNil)
39}
40
41func (s *MgoSuite) TearDownSuite(c *C) {
42 s.server.Process.Kill()
43 s.server.Process.Wait()
44}
45
46func (s *MgoSuite) SetUpTest(c *C) {
47 err := DropAll("localhost:50017")
48 c.Assert(err, IsNil)
49 mgo.SetLogger(c)
50 mgo.ResetStats()
51 s.Addr = "127.0.0.1:50017"
52 s.Session, err = mgo.Dial(s.Addr)
53 c.Assert(err, IsNil)
54}
55
56func (s *MgoSuite) TearDownTest(c *C) {
57 if s.Session != nil {
58 s.Session.Close()
59 }
60 for i := 0; ; i++ {
61 stats := mgo.GetStats()
62 if stats.SocketsInUse == 0 && stats.SocketsAlive == 0 {
63 break
64 }
65 if i == 20 {
66 c.Fatal("Test left sockets in a dirty state")
67 }
68 c.Logf("Waiting for sockets to die: %d in use, %d alive", stats.SocketsInUse, stats.SocketsAlive)
69 time.Sleep(500 * time.Millisecond)
70 }
71}
72
73func DropAll(mongourl string) (err error) {
74 session, err := mgo.Dial(mongourl)
75 if err != nil {
76 return err
77 }
78 defer session.Close()
79
80 names, err := session.DatabaseNames()
81 if err != nil {
82 return err
83 }
84 for _, name := range names {
85 switch name {
86 case "admin", "local", "config":
87 default:
88 err = session.DB(name).DropDatabase()
89 if err != nil {
90 return err
91 }
92 }
93 }
94 return nil
95}
960
=== removed file 'store/server.go'
--- store/server.go 2012-10-11 14:52:21 +0000
+++ store/server.go 1970-01-01 00:00:00 +0000
@@ -1,191 +0,0 @@
1package store
2
3import (
4 "encoding/json"
5 "io"
6 "launchpad.net/juju-core/charm"
7 "launchpad.net/juju-core/log"
8 "net/http"
9 "strconv"
10 "strings"
11)
12
13// Server is an http.Handler that serves the HTTP API of juju
14// so that juju clients can retrieve published charms.
15type Server struct {
16 store *Store
17 mux *http.ServeMux
18}
19
20// New returns a new *Server using store.
21func NewServer(store *Store) (*Server, error) {
22 s := &Server{
23 store: store,
24 mux: http.NewServeMux(),
25 }
26 s.mux.HandleFunc("/charm-info", func(w http.ResponseWriter, r *http.Request) {
27 s.serveInfo(w, r)
28 })
29 s.mux.HandleFunc("/charm/", func(w http.ResponseWriter, r *http.Request) {
30 s.serveCharm(w, r)
31 })
32 s.mux.HandleFunc("/stats/counter/", func(w http.ResponseWriter, r *http.Request) {
33 s.serveStats(w, r)
34 })
35
36 // This is just a validation key to allow blitz.io to run
37 // performance tests against the site.
38 s.mux.HandleFunc("/mu-35700a31-6bf320ca-a800b670-05f845ee", func(w http.ResponseWriter, r *http.Request) {
39 s.serveBlitzKey(w, r)
40 })
41 return s, nil
42}
43
44// ServeHTTP serves an http request.
45// This method turns *Server into an http.Handler.
46func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
47 if r.URL.Path == "/" {
48 http.Redirect(w, r, "https://juju.ubuntu.com", http.StatusSeeOther)
49 return
50 }
51 s.mux.ServeHTTP(w, r)
52}
53
54func statsEnabled(req *http.Request) bool {
55 // It's fine to parse the form more than once, and it avoids
56 // bugs from not parsing it.
57 req.ParseForm()
58 return req.Form.Get("stats") != "0"
59}
60
61func charmStatsKey(curl *charm.URL, kind string) []string {
62 if curl.User == "" {
63 return []string{kind, curl.Series, curl.Name}
64 }
65 return []string{kind, curl.Series, curl.Name, curl.User}
66}
67
68func (s *Server) serveInfo(w http.ResponseWriter, r *http.Request) {
69 if r.URL.Path != "/charm-info" {
70 w.WriteHeader(http.StatusNotFound)
71 return
72 }
73 r.ParseForm()
74 response := map[string]*charm.InfoResponse{}
75 for _, url := range r.Form["charms"] {
76 c := &charm.InfoResponse{}
77 response[url] = c
78 curl, err := charm.ParseURL(url)
79 var info *CharmInfo
80 if err == nil {
81 info, err = s.store.CharmInfo(curl)
82 }
83 var skey []string
84 if err == nil {
85 skey = charmStatsKey(curl, "charm-info")
86 c.Sha256 = info.BundleSha256()
87 c.Revision = info.Revision()
88 c.Digest = info.Digest()
89 } else {
90 if err == ErrNotFound {
91 skey = charmStatsKey(curl, "charm-missing")
92 }
93 c.Errors = append(c.Errors, err.Error())
94 }
95 if skey != nil && statsEnabled(r) {
96 go s.store.IncCounter(skey)
97 }
98 }
99 data, err := json.Marshal(response)
100 if err == nil {
101 w.Header().Set("Content-Type", "application/json")
102 _, err = w.Write(data)
103 }
104 if err != nil {
105 log.Printf("store: cannot write content: %v", err)
106 w.WriteHeader(http.StatusInternalServerError)
107 return
108 }
109}
110
111func (s *Server) serveCharm(w http.ResponseWriter, r *http.Request) {
112 if !strings.HasPrefix(r.URL.Path, "/charm/") {
113 panic("serveCharm: bad url")
114 }
115 curl, err := charm.ParseURL("cs:" + r.URL.Path[len("/charm/"):])
116 if err != nil {
117 w.WriteHeader(http.StatusNotFound)
118 return
119 }
120 info, rc, err := s.store.OpenCharm(curl)
121 if err == ErrNotFound {
122 w.WriteHeader(http.StatusNotFound)
123 return
124 }
125 if err != nil {
126 w.WriteHeader(http.StatusInternalServerError)
127 log.Printf("store: cannot open charm %q: %v", curl, err)
128 return
129 }
130 if statsEnabled(r) {
131 go s.store.IncCounter(charmStatsKey(curl, "charm-bundle"))
132 }
133 defer rc.Close()
134 w.Header().Set("Connection", "close") // No keep-alive for now.
135 w.Header().Set("Content-Type", "application/octet-stream")
136 w.Header().Set("Content-Length", strconv.FormatInt(info.BundleSize(), 10))
137 _, err = io.Copy(w, rc)
138 if err != nil {
139 log.Printf("store: failed to stream charm %q: %v", curl, err)
140 }
141}
142
143func (s *Server) serveStats(w http.ResponseWriter, r *http.Request) {
144 // TODO: Adopt a smarter mux that simplifies this logic.
145 const dir = "/stats/counter/"
146 if !strings.HasPrefix(r.URL.Path, dir) {
147 panic("bad url")
148 }
149 base := r.URL.Path[len(dir):]
150 if strings.Index(base, "/") > 0 {
151 w.WriteHeader(http.StatusNotFound)
152 return
153 }
154 if base == "" {
155 w.WriteHeader(http.StatusForbidden)
156 return
157 }
158 key := strings.Split(base, ":")
159 prefix := false
160 if key[len(key)-1] == "*" {
161 prefix = true
162 key = key[:len(key)-1]
163 if len(key) == 0 {
164 // No point in counting something unknown.
165 w.WriteHeader(http.StatusForbidden)
166 return
167 }
168 }
169 r.ParseForm()
170 sum, err := s.store.SumCounter(key, prefix)
171 if err != nil {
172 log.Printf("store: cannot sum counter: %v", err)
173 w.WriteHeader(http.StatusInternalServerError)
174 return
175 }
176 data := []byte(strconv.FormatInt(sum, 10))
177 w.Header().Set("Content-Type", "text/plain")
178 w.Header().Set("Content-Length", strconv.Itoa(len(data)))
179 _, err = w.Write(data)
180 if err != nil {
181 log.Printf("store: cannot write content: %v", err)
182 w.WriteHeader(http.StatusInternalServerError)
183 }
184}
185
186func (s *Server) serveBlitzKey(w http.ResponseWriter, r *http.Request) {
187 w.Header().Set("Connection", "close")
188 w.Header().Set("Content-Type", "text/plain")
189 w.Header().Set("Content-Length", "2")
190 w.Write([]byte("42"))
191}
1920
=== removed file 'store/server_test.go'
--- store/server_test.go 2012-09-05 21:08:26 +0000
+++ store/server_test.go 1970-01-01 00:00:00 +0000
@@ -1,209 +0,0 @@
1package store_test
2
3import (
4 "encoding/json"
5 "io/ioutil"
6 . "launchpad.net/gocheck"
7 "launchpad.net/juju-core/charm"
8 "launchpad.net/juju-core/store"
9 "net/http"
10 "net/http/httptest"
11 "net/url"
12 "strconv"
13 "time"
14)
15
16func (s *StoreSuite) prepareServer(c *C) (*store.Server, *charm.URL) {
17 curl := charm.MustParseURL("cs:oneiric/wordpress")
18 pub, err := s.store.CharmPublisher([]*charm.URL{curl}, "some-digest")
19 c.Assert(err, IsNil)
20 err = pub.Publish(&FakeCharmDir{})
21 c.Assert(err, IsNil)
22
23 server, err := store.NewServer(s.store)
24 c.Assert(err, IsNil)
25 return server, curl
26}
27
28func (s *StoreSuite) TestServerCharmInfo(c *C) {
29 server, curl := s.prepareServer(c)
30 req, err := http.NewRequest("GET", "/charm-info", nil)
31 c.Assert(err, IsNil)
32
33 var tests = []struct{ url, sha, digest, err string }{
34 {curl.String(), fakeRevZeroSha, "some-digest", ""},
35 {"cs:oneiric/non-existent", "", "", "entry not found"},
36 {"cs:bad", "", "", `charm URL without series: "cs:bad"`},
37 }
38
39 for _, t := range tests {
40 req.Form = url.Values{"charms": []string{t.url}}
41 rec := httptest.NewRecorder()
42 server.ServeHTTP(rec, req)
43
44 expected := make(map[string]interface{})
45 if t.sha != "" {
46 expected[t.url] = map[string]interface{}{
47 "revision": float64(0),
48 "sha256": t.sha,
49 "digest": t.digest,
50 }
51 } else {
52 expected[t.url] = map[string]interface{}{
53 "revision": float64(0),
54 "errors": []interface{}{t.err},
55 }
56 }
57 obtained := map[string]interface{}{}
58 err = json.NewDecoder(rec.Body).Decode(&obtained)
59 c.Assert(err, IsNil)
60 c.Assert(obtained, DeepEquals, expected)
61 c.Assert(rec.Header().Get("Content-Type"), Equals, "application/json")
62 }
63
64 s.checkCounterSum(c, []string{"charm-info", curl.Series, curl.Name}, false, 1)
65 s.checkCounterSum(c, []string{"charm-missing", "oneiric", "non-existent"}, false, 1)
66}
67
68// checkCounterSum checks that statistics are properly collected.
69// It retries a few times as they are generally collected in background.
70func (s *StoreSuite) checkCounterSum(c *C, key []string, prefix bool, expected int64) {
71 var sum int64
72 var err error
73 for retry := 0; retry < 10; retry++ {
74 time.Sleep(1e8)
75 sum, err = s.store.SumCounter(key, prefix)
76 c.Assert(err, IsNil)
77 if sum == expected {
78 if expected == 0 && retry < 2 {
79 continue // Wait a bit to make sure.
80 }
81 return
82 }
83 }
84 c.Errorf("counter sum for %#v is %d, want %d", key, sum, expected)
85}
86
87func (s *StoreSuite) TestCharmStreaming(c *C) {
88 server, curl := s.prepareServer(c)
89
90 req, err := http.NewRequest("GET", "/charm/"+curl.String()[3:], nil)
91 c.Assert(err, IsNil)
92 rec := httptest.NewRecorder()
93 server.ServeHTTP(rec, req)
94
95 data, err := ioutil.ReadAll(rec.Body)
96 c.Assert(string(data), Equals, "charm-revision-0")
97
98 c.Assert(rec.Header().Get("Connection"), Equals, "close")
99 c.Assert(rec.Header().Get("Content-Type"), Equals, "application/octet-stream")
100 c.Assert(rec.Header().Get("Content-Length"), Equals, "16")
101
102 // Check that it was accounted for in statistics.
103 s.checkCounterSum(c, []string{"charm-bundle", curl.Series, curl.Name}, false, 1)
104}
105
106func (s *StoreSuite) TestDisableStats(c *C) {
107 server, curl := s.prepareServer(c)
108
109 req, err := http.NewRequest("GET", "/charm-info", nil)
110 c.Assert(err, IsNil)
111 req.Form = url.Values{"charms": []string{curl.String()}, "stats": []string{"0"}}
112 rec := httptest.NewRecorder()
113 server.ServeHTTP(rec, req)
114 c.Assert(rec.Code, Equals, 200)
115
116 req, err = http.NewRequest("GET", "/charm/"+curl.String()[3:], nil)
117 c.Assert(err, IsNil)
118 req.Form = url.Values{"stats": []string{"0"}}
119 rec = httptest.NewRecorder()
120 server.ServeHTTP(rec, req)
121 c.Assert(rec.Code, Equals, 200)
122
123 // No statistics should have been collected given the use of stats=0.
124 for _, prefix := range []string{"charm-info", "charm-bundle", "charm-missing"} {
125 s.checkCounterSum(c, []string{prefix}, true, 0)
126 }
127}
128
129func (s *StoreSuite) TestServerStatus(c *C) {
130 server, err := store.NewServer(s.store)
131 c.Assert(err, IsNil)
132 tests := []struct {
133 path string
134 code int
135 }{
136 {"/charm-info/any", 404},
137 {"/charm/bad-url", 404},
138 {"/charm/bad-series/wordpress", 404},
139 {"/stats/counter/", 403},
140 {"/stats/counter/*", 403},
141 {"/stats/counter/any/", 404},
142 {"/stats/", 404},
143 {"/stats/any", 404},
144 }
145 for _, test := range tests {
146 req, err := http.NewRequest("GET", test.path, nil)
147 c.Assert(err, IsNil)
148 rec := httptest.NewRecorder()
149 server.ServeHTTP(rec, req)
150 c.Assert(rec.Code, Equals, test.code, Commentf("Path: %s", test.path))
151 }
152}
153
154func (s *StoreSuite) TestRootRedirect(c *C) {
155 server, err := store.NewServer(s.store)
156 c.Assert(err, IsNil)
157 req, err := http.NewRequest("GET", "/", nil)
158 c.Assert(err, IsNil)
159 rec := httptest.NewRecorder()
160 server.ServeHTTP(rec, req)
161 c.Assert(rec.Code, Equals, 303)
162 c.Assert(rec.Header().Get("Location"), Equals, "https://juju.ubuntu.com")
163}
164
165func (s *StoreSuite) TestStatsCounter(c *C) {
166 for _, key := range [][]string{{"a", "b"}, {"a", "b"}, {"a"}} {
167 err := s.store.IncCounter(key)
168 c.Assert(err, IsNil)
169 }
170
171 server, _ := s.prepareServer(c)
172
173 expected := map[string]string{
174 "a:b": "2",
175 "a:*": "3",
176 "a": "1",
177 }
178
179 for counter, n := range expected {
180 req, err := http.NewRequest("GET", "/stats/counter/"+counter, nil)
181 c.Assert(err, IsNil)
182 rec := httptest.NewRecorder()
183 server.ServeHTTP(rec, req)
184
185 data, err := ioutil.ReadAll(rec.Body)
186 c.Assert(string(data), Equals, n)
187
188 c.Assert(rec.Header().Get("Content-Type"), Equals, "text/plain")
189 c.Assert(rec.Header().Get("Content-Length"), Equals, strconv.Itoa(len(n)))
190 }
191}
192
193func (s *StoreSuite) TestBlitzKey(c *C) {
194 server, _ := s.prepareServer(c)
195
196 // This is just a validation key to allow blitz.io to run
197 // performance tests against the site.
198 req, err := http.NewRequest("GET", "/mu-35700a31-6bf320ca-a800b670-05f845ee", nil)
199 c.Assert(err, IsNil)
200 rec := httptest.NewRecorder()
201 server.ServeHTTP(rec, req)
202
203 data, err := ioutil.ReadAll(rec.Body)
204 c.Assert(string(data), Equals, "42")
205
206 c.Assert(rec.Header().Get("Connection"), Equals, "close")
207 c.Assert(rec.Header().Get("Content-Type"), Equals, "text/plain")
208 c.Assert(rec.Header().Get("Content-Length"), Equals, "2")
209}
2100
=== removed file 'store/store.go'
--- store/store.go 2012-10-11 17:40:17 +0000
+++ store/store.go 1970-01-01 00:00:00 +0000
@@ -1,774 +0,0 @@
1// The store package is capable of storing and updating charms in a MongoDB
2// database, as well as maintaining further information about them such as
3// the VCS revision the charm was loaded from and the URLs for the charms.
4package store
5
6import (
7 "crypto/sha256"
8 "encoding/hex"
9 "errors"
10 "fmt"
11 "hash"
12 "io"
13 "labix.org/v2/mgo"
14 "labix.org/v2/mgo/bson"
15 "launchpad.net/juju-core/charm"
16 "launchpad.net/juju-core/log"
17 "sort"
18 "strconv"
19 "sync"
20 "time"
21)
22
23// The following MongoDB collections are currently used:
24//
25// juju.events - Log of events relating to the lifecycle of charms
26// juju.charms - Information about the stored charms
27// juju.charmfs.* - GridFS with the charm files
28// juju.locks - Has unique keys with url of updating charms
29// juju.stat.counters - Counters for statistics
30// juju.stat.tokens - Tokens used in statistics counter keys
31
32var (
33 ErrUpdateConflict = errors.New("charm update in progress")
34 ErrRedundantUpdate = errors.New("charm is up-to-date")
35 ErrNotFound = errors.New("entry not found")
36)
37
38const (
39 UpdateTimeout = 600e9
40)
41
42// Store holds a connection to a charm store.
43type Store struct {
44 session *storeSession
45
46 // Cache for statistics key words (two generations).
47 cacheMu sync.RWMutex
48 statsTokenNew map[string]int
49 statsTokenOld map[string]int
50}
51
52// Open creates a new session with the store. It connects to the MongoDB
53// server at the given address (as expected by the Mongo function in the
54// labix.org/v2/mgo package).
55func Open(mongoAddr string) (store *Store, err error) {
56 log.Printf("store: Store opened. Connecting to: %s", mongoAddr)
57 store = &Store{}
58 session, err := mgo.Dial(mongoAddr)
59 if err != nil {
60 log.Printf("store: Error connecting to MongoDB: %v", err)
61 return nil, err
62 }
63
64 store = &Store{session: &storeSession{session}}
65
66 // Ignore error. It'll always fail after created.
67 // TODO Check the error once mgo hands it to us.
68 _ = store.session.DB("juju").Run(bson.D{{"create", "stat.counters"}, {"autoIndexId", false}}, nil)
69
70 if err := store.ensureIndexes(); err != nil {
71 session.Close()
72 return nil, err
73 }
74
75 // Put the used socket back in the pool.
76 session.Refresh()
77 return store, nil
78}
79
80func (s *Store) ensureIndexes() error {
81 session := s.session
82 indexes := []struct {
83 c *mgo.Collection
84 i mgo.Index
85 }{{
86 session.StatCounters(),
87 mgo.Index{Key: []string{"k", "t"}, Unique: true},
88 }, {
89 session.StatTokens(),
90 mgo.Index{Key: []string{"t"}, Unique: true},
91 }, {
92 session.Charms(),
93 mgo.Index{Key: []string{"urls", "revision"}, Unique: true},
94 }, {
95 session.Events(),
96 mgo.Index{Key: []string{"urls", "digest"}},
97 }}
98 for _, idx := range indexes {
99 err := idx.c.EnsureIndex(idx.i)
100 if err != nil {
101 log.Printf("store: Error ensuring stat.counters index: %v", err)
102 return err
103 }
104 }
105 return nil
106}
107
108// Close terminates the connection with the store.
109func (s *Store) Close() {
110 s.session.Close()
111}
112
113// statsKey returns the compound statistics identifier that represents key.
114// If write is true, the identifier will be created if necessary.
115// Identifiers have a form similar to "ab:c:def:", where each section is a
116// base-32 number that represents the respective word in key. This form
117// allows efficiently indexing and searching for prefixes, while detaching
118// the key content and size from the actual words used in key.
119func (s *Store) statsKey(session *storeSession, key []string, write bool) (string, error) {
120 if len(key) == 0 {
121 return "", fmt.Errorf("store: empty statistics key")
122 }
123 tokens := session.StatTokens()
124 skey := make([]byte, 0, len(key)*4)
125 // Retry limit is mainly to prevent infinite recursion in edge cases,
126 // such as if the database is ever run in read-only mode.
127 // The logic below should deteministically stop in normal scenarios.
128 var err error
129 for i, retry := 0, 30; i < len(key) && retry > 0; retry-- {
130 err = nil
131 id, found := s.statsTokenId(key[i])
132 if !found {
133 var t struct {
134 Id int "_id"
135 Token string "t"
136 }
137 err = tokens.Find(bson.D{{"t", key[i]}}).One(&t)
138 if err == mgo.ErrNotFound {
139 if !write {
140 return "", ErrNotFound
141 }
142 t.Id, err = tokens.Count()
143 if err != nil {
144 continue
145 }
146 t.Id++
147 t.Token = key[i]
148 err = tokens.Insert(&t)
149 }
150 if err != nil {
151 continue
152 }
153 s.cacheStatsTokenId(t.Token, t.Id)
154 id = t.Id
155 }
156 skey = strconv.AppendInt(skey, int64(id), 32)
157 skey = append(skey, ':')
158 i++
159 }
160 if err != nil {
161 return "", err
162 }
163 return string(skey), nil
164}
165
166const statsTokenCacheSize = 512
167
168// cacheStatsTokenId adds the id for token into the cache.
169// The cache has two generations so that the least frequently used
170// tokens are evicted regularly.
171func (s *Store) cacheStatsTokenId(token string, id int) {
172 s.cacheMu.Lock()
173 defer s.cacheMu.Unlock()
174 // Can't possibly be >, but reviews want it for defensiveness.
175 if len(s.statsTokenNew) >= statsTokenCacheSize {
176 s.statsTokenOld = s.statsTokenNew
177 s.statsTokenNew = nil
178 }
179 if s.statsTokenNew == nil {
180 s.statsTokenNew = make(map[string]int, statsTokenCacheSize)
181 }
182 s.statsTokenNew[token] = id
183}
184
185// statsTokenId returns the id for token from the cache, if found.
186func (s *Store) statsTokenId(token string) (id int, found bool) {
187 s.cacheMu.RLock()
188 id, found = s.statsTokenNew[token]
189 if found {
190 s.cacheMu.RUnlock()
191 return
192 }
193 id, found = s.statsTokenOld[token]
194 s.cacheMu.RUnlock()
195 if found {
196 s.cacheStatsTokenId(token, id)
197 }
198 return
199}
200
201var counterEpoch = time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Unix()
202
203// IncCounter increases by one the counter associated with the composed key.
204func (s *Store) IncCounter(key []string) error {
205 session := s.session.Copy()
206 defer session.Close()
207
208 skey, err := s.statsKey(session, key, true)
209 if err != nil {
210 return err
211 }
212
213 t := time.Now().UTC()
214 // Round to the start of the minute so we get one document per minute at most.
215 t = t.Add(-time.Duration(t.Second()) * time.Second)
216 counters := session.StatCounters()
217 _, err = counters.Upsert(bson.D{{"k", skey}, {"t", int32(t.Unix() - counterEpoch)}}, bson.D{{"$inc", bson.D{{"c", 1}}}})
218 return err
219}
220
221// SumCounter returns the sum of all the counters that exactly match key,
222// or that are prefixed by it if prefix is true.
223func (s *Store) SumCounter(key []string, prefix bool) (count int64, err error) {
224 session := s.session.Copy()
225 defer session.Close()
226
227 skey, err := s.statsKey(session, key, false)
228 if err == ErrNotFound {
229 return 0, nil
230 }
231 if err != nil {
232 return 0, err
233 }
234
235 var regex string
236 if prefix {
237 regex = "^" + skey
238 } else {
239 regex = "^" + skey + "$"
240 }
241
242 job := mgo.MapReduce{
243 Map: "function() { emit('count', this.c); }",
244 Reduce: "function(key, values) { return Array.sum(values); }",
245 }
246 var result []struct{ Value int64 }
247 counters := session.StatCounters()
248 _, err = counters.Find(bson.D{{"k", bson.D{{"$regex", regex}}}}).MapReduce(&job, &result)
249 if len(result) > 0 {
250 return result[0].Value, err
251 }
252 return 0, err
253}
254
255// A CharmPublisher is responsible for importing a charm dir onto the store.
256type CharmPublisher struct {
257 revision int
258 w *charmWriter
259}
260
261// Revision returns the revision that will be assigned to the published charm.
262func (p *CharmPublisher) Revision() int {
263 return p.revision
264}
265
266// CharmDir matches the part of the interface of *charm.Dir that is necessary
267// to publish a charm. Using this interface rather than *charm.Dir directly
268// makes testing some aspects of the store possible.
269type CharmDir interface {
270 Meta() *charm.Meta
271 Config() *charm.Config
272 SetRevision(revision int)
273 BundleTo(w io.Writer) error
274}
275
276// Statically ensure that *charm.Dir is indeed a CharmDir.
277var _ CharmDir = (*charm.Dir)(nil)
278
279// Publish bundles charm and writes it to the store. The written charm
280// bundle will have its revision set to the result of Revision.
281// Publish must be called only once for a CharmPublisher.
282func (p *CharmPublisher) Publish(charm CharmDir) error {
283 w := p.w
284 if w == nil {
285 panic("CharmPublisher already published a charm")
286 }
287 p.w = nil
288 w.charm = charm
289 // TODO: Refactor to BundleTo(w, revision)
290 charm.SetRevision(p.revision)
291 err := charm.BundleTo(w)
292 if err == nil {
293 err = w.finish()
294 } else {
295 w.abort()
296 }
297 return err
298}
299
300// CharmPublisher returns a new CharmPublisher for importing a charm that
301// will be made available in the store at all of the provided URLs.
302// The digest parameter must contain the unique identifier that
303// represents the charm data being imported (e.g. the VCS revision sha1).
304// ErrRedundantUpdate is returned if all of the provided urls are
305// already associated to that digest.
306func (s *Store) CharmPublisher(urls []*charm.URL, digest string) (p *CharmPublisher, err error) {
307 log.Printf("store: Trying to add charms %v with key %q...", urls, digest)
308 if err = mustLackRevision("CharmPublisher", urls...); err != nil {
309 return
310 }
311 session := s.session.Copy()
312 defer session.Close()
313
314 maxRev := -1
315 newKey := false
316 charms := session.Charms()
317 doc := charmDoc{}
318 for i := range urls {
319 urlStr := urls[i].String()
320 err = charms.Find(bson.D{{"urls", urlStr}}).Sort("-revision").One(&doc)
321 if err == mgo.ErrNotFound {
322 log.Printf("store: Charm %s not yet in the store.", urls[i])
323 newKey = true
324 continue
325 }
326 if doc.Digest != digest {
327 log.Printf("store: Charm %s is out of date with revision key %q.", urlStr, digest)
328 newKey = true
329 }
330 if err != nil {
331 log.Printf("store: Unknown error looking for charm %s: %s", urlStr, err)
332 return
333 }
334 if doc.Revision > maxRev {
335 maxRev = doc.Revision
336 }
337 }
338 if !newKey {
339 log.Printf("store: All charms have revision key %q. Nothing to update.", digest)
340 err = ErrRedundantUpdate
341 return
342 }
343 revision := maxRev + 1
344 log.Printf("store: Preparing writer to add charms with revision %d.", revision)
345 w := &charmWriter{
346 store: s,
347 urls: urls,
348 revision: revision,
349 digest: digest,
350 }
351 return &CharmPublisher{revision, w}, nil
352}
353
354// charmWriter is an io.Writer that writes charm bundles to the charms GridFS.
355type charmWriter struct {
356 store *Store
357 session *storeSession
358 file *mgo.GridFile
359 sha256 hash.Hash
360 charm CharmDir
361 urls []*charm.URL
362 revision int
363 digest string
364}
365
366// Write creates an entry in the charms GridFS when first called,
367// and streams all written data into it.
368func (w *charmWriter) Write(data []byte) (n int, err error) {
369 if w.file == nil {
370 w.session = w.store.session.Copy()
371 w.file, err = w.session.CharmFS().Create("")
372 if err != nil {
373 log.Printf("store: Failed to create GridFS file: %v", err)
374 return 0, err
375 }
376 w.sha256 = sha256.New()
377 log.Printf("store: Creating GridFS file with id %q...", w.file.Id().(bson.ObjectId).Hex())
378 }
379 _, err = w.sha256.Write(data)
380 if err != nil {
381 panic("hash.Hash should never error")
382 }
383 return w.file.Write(data)
384}
385
386// abort cancels the charm writing.
387func (w *charmWriter) abort() {
388 if w.file != nil {
389 // Ignore error. Already aborting due to a preceding bad situation
390 // elsewhere. This error is not important right now.
391 _ = w.file.Close()
392 w.session.Close()
393 }
394}
395
396// finish completes the charm writing process and inserts the final metadata.
397// After it completes the charm will be available for consumption.
398func (w *charmWriter) finish() error {
399 if w.file == nil {
400 return nil
401 }
402 defer w.session.Close()
403 id := w.file.Id()
404 size := w.file.Size()
405 err := w.file.Close()
406 if err != nil {
407 log.Printf("store: Failed to close GridFS file: %v", err)
408 return err
409 }
410 charms := w.session.Charms()
411 sha256 := hex.EncodeToString(w.sha256.Sum(nil))
412 charm := charmDoc{
413 w.urls,
414 w.revision,
415 w.digest,
416 sha256,
417 size,
418 id.(bson.ObjectId),
419 w.charm.Meta(),
420 w.charm.Config(),
421 }
422 if err = charms.Insert(&charm); err != nil {
423 err = maybeConflict(err)
424 log.Printf("store: Failed to insert new revision of charm %v: %v", w.urls, err)
425 return err
426 }
427 return nil
428}
429
430type CharmInfo struct {
431 revision int
432 digest string
433 sha256 string
434 size int64
435 fileId bson.ObjectId
436 meta *charm.Meta
437 config *charm.Config
438}
439
440// Statically ensure CharmInfo is a charm.Charm.
441var _ charm.Charm = (*CharmInfo)(nil)
442
443// Revision returns the store charm's revision.
444func (ci *CharmInfo) Revision() int {
445 return ci.revision
446}
447
448// BundleSha256 returns the sha256 checksum for the stored charm bundle.
449func (ci *CharmInfo) BundleSha256() string {
450 return ci.sha256
451}
452
453// BundleSize returns the size for the stored charm bundle.
454func (ci *CharmInfo) BundleSize() int64 {
455 return ci.size
456}
457
458// Digest returns the unique identifier that represents the charm
459// data imported. This is typically set to the VCS revision digest.
460func (ci *CharmInfo) Digest() string {
461 return ci.digest
462}
463
464// Meta returns the charm.Meta details for the stored charm.
465func (ci *CharmInfo) Meta() *charm.Meta {
466 return ci.meta
467}
468
469// Config returns the charm.Config details for the stored charm.
470func (ci *CharmInfo) Config() *charm.Config {
471 return ci.config
472}
473
474// CharmInfo retrieves the CharmInfo value for the charm at url.
475func (s *Store) CharmInfo(url *charm.URL) (info *CharmInfo, err error) {
476 session := s.session.Copy()
477 defer session.Close()
478
479 log.Debugf("store: Retrieving charm info for %s", url)
480 rev := url.Revision
481 url = url.WithRevision(-1)
482
483 charms := session.Charms()
484 var cdoc charmDoc
485 var qdoc interface{}
486 if rev == -1 {
487 qdoc = bson.D{{"urls", url}}
488 } else {
489 qdoc = bson.D{{"urls", url}, {"revision", rev}}
490 }
491 err = charms.Find(qdoc).Sort("-revision").One(&cdoc)
492 if err != nil {
493 log.Printf("store: Failed to find charm %s: %v", url, err)
494 return nil, ErrNotFound
495 }
496 info = &CharmInfo{
497 cdoc.Revision,
498 cdoc.Digest,
499 cdoc.Sha256,
500 cdoc.Size,
501 cdoc.FileId,
502 cdoc.Meta,
503 cdoc.Config,
504 }
505 return info, nil
506}
507
508// OpenCharm opens for reading via rc the charm currently available at url.
509// rc must be closed after dealing with it or resources will leak.
510func (s *Store) OpenCharm(url *charm.URL) (info *CharmInfo, rc io.ReadCloser, err error) {
511 log.Debugf("store: Opening charm %s", url)
512 info, err = s.CharmInfo(url)
513 if err != nil {
514 return nil, nil, err
515 }
516 session := s.session.Copy()
517 file, err := session.CharmFS().OpenId(info.fileId)
518 if err != nil {
519 log.Printf("store: Failed to open GridFS file for charm %s: %v", url, err)
520 session.Close()
521 return nil, nil, err
522 }
523 rc = &reader{session, file}
524 return
525}
526
527type reader struct {
528 session *storeSession
529 file *mgo.GridFile
530}
531
532// Read consumes data from the opened charm.
533func (r *reader) Read(buf []byte) (n int, err error) {
534 return r.file.Read(buf)
535}
536
537// Close closes the opened charm and frees associated resources.
538func (r *reader) Close() error {
539 err := r.file.Close()
540 r.session.Close()
541 return err
542}
543
544// charmDoc represents the document stored in MongoDB for a charm.
545type charmDoc struct {
546 URLs []*charm.URL
547 Revision int
548 Digest string
549 Sha256 string
550 Size int64
551 FileId bson.ObjectId
552 Meta *charm.Meta
553 Config *charm.Config
554}
555
556// LockUpdates acquires a server-side lock for updating a single charm
557// that is supposed to be made available in all of the provided urls.
558// If the lock can't be acquired in any of the urls, an error will be
559// immediately returned.
560// In the usual case, any locking done is undone when an error happens,
561// or when l.Unlock is called. If something else goes wrong, the locks
562// will also expire after the period defined in UpdateTimeout.
563func (s *Store) LockUpdates(urls []*charm.URL) (l *UpdateLock, err error) {
564 session := s.session.Copy()
565 keys := make([]string, len(urls))
566 for i := range urls {
567 keys[i] = urls[i].String()
568 }
569 sort.Strings(keys)
570 l = &UpdateLock{keys, session.Locks(), bson.Now()}
571 if err = l.tryLock(); err != nil {
572 session.Close()
573 return nil, err
574 }
575 return l, nil
576}
577
578// UpdateLock represents an acquired update lock over a set of charm URLs.
579type UpdateLock struct {
580 keys []string
581 locks *mgo.Collection
582 time time.Time
583}
584
585// Unlock removes the previously acquired server-side lock that prevents
586// other processes from attempting to update a set of charm URLs.
587func (l *UpdateLock) Unlock() {
588 log.Debugf("store: Unlocking charms for future updates: %v", l.keys)
589 defer l.locks.Database.Session.Close()
590 for i := len(l.keys) - 1; i >= 0; i-- {
591 // Using time below ensures only the proper lock is removed.
592 // Can't do much about errors here. Locks will expire anyway.
593 l.locks.Remove(bson.D{{"_id", l.keys[i]}, {"time", l.time}})
594 }
595}
596
597// tryLock tries locking l.keys, one at a time, and succeeds only if it
598// can lock all of them in order. The keys should be pre-sorted so that
599// two-way conflicts can't happen. If any of the keys fail to be locked,
600// and expiring the old lock doesn't work, tryLock undoes all previous
601// locks and aborts with an error.
602func (l *UpdateLock) tryLock() error {
603 for i, key := range l.keys {
604 log.Debugf("store: Trying to lock charm %s for updates...", key)
605 doc := bson.D{{"_id", key}, {"time", l.time}}
606 err := l.locks.Insert(doc)
607 if err == nil {
608 log.Debugf("store: Charm %s is now locked for updates.", key)
609 continue
610 }
611 if lerr, ok := err.(*mgo.LastError); ok && lerr.Code == 11000 {
612 log.Debugf("store: Charm %s is locked. Trying to expire lock.", key)
613 l.tryExpire(key)
614 err = l.locks.Insert(doc)
615 if err == nil {
616 log.Debugf("store: Charm %s is now locked for updates.", key)
617 continue
618 }
619 }
620 // Couldn't lock everyone. Undo previous locks.
621 for j := i - 1; j >= 0; j-- {
622 // Using time below should be unnecessary, but it's an extra check.
623 // Can't do anything about errors here. Lock will expire anyway.
624 l.locks.Remove(bson.D{{"_id", l.keys[j]}, {"time", l.time}})
625 }
626 err = maybeConflict(err)
627 log.Printf("store: Can't lock charms %v for updating: %v", l.keys, err)
628 return err
629 }
630 return nil
631}
632
633// tryExpire attempts to remove outdated locks from the database.
634func (l *UpdateLock) tryExpire(key string) {
635 // Ignore errors. If nothing happens the key will continue locked.
636 l.locks.Remove(bson.D{{"_id", key}, {"time", bson.D{{"$lt", bson.Now().Add(-UpdateTimeout)}}}})
637}
638
639// maybeConflict returns an ErrUpdateConflict if err is a mgo
640// insert conflict LastError, or err itself otherwise.
641func maybeConflict(err error) error {
642 if lerr, ok := err.(*mgo.LastError); ok && lerr.Code == 11000 {
643 return ErrUpdateConflict
644 }
645 return err
646}
647
648// storeSession wraps a mgo.Session ands adds a few convenience methods.
649type storeSession struct {
650 *mgo.Session
651}
652
653// Copy copies the storeSession and its underlying mgo session.
654func (s *storeSession) Copy() *storeSession {
655 return &storeSession{s.Session.Copy()}
656}
657
658// Charms returns the mongo collection where charms are stored.
659func (s *storeSession) Charms() *mgo.Collection {
660 return s.DB("juju").C("charms")
661}
662
663// Charms returns a mgo.GridFS to read and write charms.
664func (s *storeSession) CharmFS() *mgo.GridFS {
665 return s.DB("juju").GridFS("charmfs")
666}
667
668// Events returns the mongo collection where charm events are stored.
669func (s *storeSession) Events() *mgo.Collection {
670 return s.DB("juju").C("events")
671}
672
673// Locks returns the mongo collection where charm locks are stored.
674func (s *storeSession) Locks() *mgo.Collection {
675 return s.DB("juju").C("locks")
676}
677
678// StatTokens returns the mongo collection for storing key tokens
679// for statistics collection.
680func (s *storeSession) StatTokens() *mgo.Collection {
681 return s.DB("juju").C("stat.tokens")
682}
683
684// StatCounters returns the mongo collection for counter values.
685func (s *storeSession) StatCounters() *mgo.Collection {
686 return s.DB("juju").C("stat.counters")
687}
688
689type CharmEventKind int
690
691const (
692 EventPublished CharmEventKind = iota + 1
693 EventPublishError
694
695 EventKindCount
696)
697
698func (k CharmEventKind) String() string {
699 switch k {
700 case EventPublished:
701 return "published"
702 case EventPublishError:
703 return "publish-error"
704 }
705 panic(fmt.Errorf("unknown charm event kind %d", k))
706}
707
708// CharmEvent is a record for an event relating to one or more charm URLs.
709type CharmEvent struct {
710 Kind CharmEventKind
711 Digest string
712 Revision int
713 URLs []*charm.URL
714 Errors []string `bson:",omitempty"`
715 Warnings []string `bson:",omitempty"`
716 Time time.Time
717}
718
719// LogCharmEvent records an event related to one or more charm URLs.
720func (s *Store) LogCharmEvent(event *CharmEvent) (err error) {
721 log.Printf("store: Adding charm event for %v with key %q: %s", event.URLs, event.Digest, event.Kind)
722 if err = mustLackRevision("LogCharmEvent", event.URLs...); err != nil {
723 return
724 }
725 session := s.session.Copy()
726 defer session.Close()
727 if event.Kind == 0 || event.Digest == "" || len(event.URLs) == 0 {
728 return fmt.Errorf("LogCharmEvent: need valid Kind, Digest and URLs")
729 }
730 if event.Time.IsZero() {
731 event.Time = time.Now()
732 }
733 events := session.Events()
734 return events.Insert(event)
735}
736
737// CharmEvent returns the most recent event associated with url
738// and digest. If the specified event isn't found the error
739// ErrUnknownChange will be returned.
740func (s *Store) CharmEvent(url *charm.URL, digest string) (*CharmEvent, error) {
741 // TODO: It'd actually make sense to find the charm event after the
742 // revision id, but since we don't care about that now, just make sure
743 // we don't write bad code.
744 if err := mustLackRevision("CharmEvent", url); err != nil {
745 return nil, err
746 }
747 session := s.session.Copy()
748 defer session.Close()
749
750 events := session.Events()
751 event := &CharmEvent{Digest: digest}
752 query := events.Find(bson.D{{"urls", url}, {"digest", digest}})
753 query.Sort("-time")
754 err := query.One(&event)
755 if err == mgo.ErrNotFound {
756 return nil, ErrNotFound
757 }
758 if err != nil {
759 return nil, err
760 }
761 return event, nil
762}
763
764// mustLackRevision returns an error if any of the urls has a revision.
765func mustLackRevision(context string, urls ...*charm.URL) error {
766 for _, url := range urls {
767 if url.Revision != -1 {
768 err := fmt.Errorf("%s: got charm URL with revision: %s", context, url)
769 log.Printf("store: %v", err)
770 return err
771 }
772 }
773 return nil
774}
7750
=== removed file 'store/store_test.go'
--- store/store_test.go 2012-11-20 07:18:32 +0000
+++ store/store_test.go 1970-01-01 00:00:00 +0000
@@ -1,608 +0,0 @@
1package store_test
2
3import (
4 "fmt"
5 "io"
6 "io/ioutil"
7 "labix.org/v2/mgo/bson"
8 . "launchpad.net/gocheck"
9 "launchpad.net/juju-core/charm"
10 "launchpad.net/juju-core/log"
11 "launchpad.net/juju-core/store"
12 "launchpad.net/juju-core/testing"
13 "strconv"
14 "sync"
15 stdtesting "testing"
16 "time"
17)
18
19func Test(t *stdtesting.T) {
20 TestingT(t)
21}
22
23var _ = Suite(&StoreSuite{})
24var _ = Suite(&TrivialSuite{})
25
26type StoreSuite struct {
27 MgoSuite
28 testing.HTTPSuite
29 store *store.Store
30}
31
32type TrivialSuite struct{}
33
34func (s *StoreSuite) SetUpSuite(c *C) {
35 s.MgoSuite.SetUpSuite(c)
36 s.HTTPSuite.SetUpSuite(c)
37}
38
39func (s *StoreSuite) TearDownSuite(c *C) {
40 s.HTTPSuite.TearDownSuite(c)
41 s.MgoSuite.TearDownSuite(c)
42}
43
44func (s *StoreSuite) SetUpTest(c *C) {
45 s.MgoSuite.SetUpTest(c)
46 var err error
47 s.store, err = store.Open(s.Addr)
48 c.Assert(err, IsNil)
49 log.Target = c
50 log.Debug = true
51}
52
53func (s *StoreSuite) TearDownTest(c *C) {
54 s.HTTPSuite.TearDownTest(c)
55 if s.store != nil {
56 s.store.Close()
57 }
58 s.MgoSuite.TearDownTest(c)
59}
60
61// FakeCharmDir is a charm that implements the interface that the
62// store publisher cares about.
63type FakeCharmDir struct {
64 revision interface{} // so we can tell if it's not set.
65 error string
66}
67
68func (d *FakeCharmDir) Meta() *charm.Meta {
69 return &charm.Meta{
70 Name: "fakecharm",
71 Summary: "Fake charm for testing purposes.",
72 Description: "This is a fake charm for testing purposes.\n",
73 Provides: make(map[string]charm.Relation),
74 Requires: make(map[string]charm.Relation),
75 Peers: make(map[string]charm.Relation),
76 }
77}
78
79func (d *FakeCharmDir) Config() *charm.Config {
80 return &charm.Config{make(map[string]charm.Option)}
81}
82
83func (d *FakeCharmDir) SetRevision(revision int) {
84 d.revision = revision
85}
86
87func (d *FakeCharmDir) BundleTo(w io.Writer) error {
88 if d.error == "beforeWrite" {
89 return fmt.Errorf(d.error)
90 }
91 _, err := w.Write([]byte(fmt.Sprintf("charm-revision-%v", d.revision)))
92 if d.error == "afterWrite" {
93 return fmt.Errorf(d.error)
94 }
95 return err
96}
97
98func (s *StoreSuite) TestCharmPublisherWithRevisionedURL(c *C) {
99 urls := []*charm.URL{charm.MustParseURL("cs:oneiric/wordpress-0")}
100 pub, err := s.store.CharmPublisher(urls, "some-digest")
101 c.Assert(err, ErrorMatches, "CharmPublisher: got charm URL with revision: cs:oneiric/wordpress-0")
102 c.Assert(pub, IsNil)
103}
104
105func (s *StoreSuite) TestCharmPublisher(c *C) {
106 urlA := charm.MustParseURL("cs:oneiric/wordpress-a")
107 urlB := charm.MustParseURL("cs:oneiric/wordpress-b")
108 urls := []*charm.URL{urlA, urlB}
109
110 pub, err := s.store.CharmPublisher(urls, "some-digest")
111 c.Assert(err, IsNil)
112 c.Assert(pub.Revision(), Equals, 0)
113
114 err = pub.Publish(testing.Charms.ClonedDir(c.MkDir(), "series", "dummy"))
115 c.Assert(err, IsNil)
116
117 for _, url := range urls {
118 info, rc, err := s.store.OpenCharm(url)
119 c.Assert(err, IsNil)
120 c.Assert(info.Revision(), Equals, 0)
121 c.Assert(info.Digest(), Equals, "some-digest")
122 data, err := ioutil.ReadAll(rc)
123 c.Check(err, IsNil)
124 err = rc.Close()
125 c.Assert(err, IsNil)
126 bundle, err := charm.ReadBundleBytes(data)
127 c.Assert(err, IsNil)
128
129 // The same information must be available by reading the
130 // full charm data...
131 c.Assert(bundle.Meta().Name, Equals, "dummy")
132 c.Assert(bundle.Config().Options["title"].Default, Equals, "My Title")
133
134 // ... and the queriable details.
135 c.Assert(info.Meta().Name, Equals, "dummy")
136 c.Assert(info.Config().Options["title"].Default, Equals, "My Title")
137
138 info2, err := s.store.CharmInfo(url)
139 c.Assert(err, IsNil)
140 c.Assert(info2, DeepEquals, info)
141 }
142}
143
144func (s *StoreSuite) TestCharmPublishError(c *C) {
145 url := charm.MustParseURL("cs:oneiric/wordpress")
146 urls := []*charm.URL{url}
147
148 // Publish one successfully to bump the revision so we can
149 // make sure it is being correctly set below.
150 pub, err := s.store.CharmPublisher(urls, "one-digest")
151 c.Assert(err, IsNil)
152 c.Assert(pub.Revision(), Equals, 0)
153 err = pub.Publish(&FakeCharmDir{})
154 c.Assert(err, IsNil)
155
156 pub, err = s.store.CharmPublisher(urls, "another-digest")
157 c.Assert(err, IsNil)
158 c.Assert(pub.Revision(), Equals, 1)
159 err = pub.Publish(&FakeCharmDir{error: "beforeWrite"})
160 c.Assert(err, ErrorMatches, "beforeWrite")
161
162 pub, err = s.store.CharmPublisher(urls, "another-digest")
163 c.Assert(err, IsNil)
164 c.Assert(pub.Revision(), Equals, 1)
165 err = pub.Publish(&FakeCharmDir{error: "afterWrite"})
166 c.Assert(err, ErrorMatches, "afterWrite")
167
168 // Still at the original charm revision that succeeded first.
169 info, err := s.store.CharmInfo(url)
170 c.Assert(err, IsNil)
171 c.Assert(info.Revision(), Equals, 0)
172 c.Assert(info.Digest(), Equals, "one-digest")
173}
174
175func (s *StoreSuite) TestCharmInfoNotFound(c *C) {
176 info, err := s.store.CharmInfo(charm.MustParseURL("cs:oneiric/wordpress"))
177 c.Assert(err, Equals, store.ErrNotFound)
178 c.Assert(info, IsNil)
179}
180
181func (s *StoreSuite) TestRevisioning(c *C) {
182 urlA := charm.MustParseURL("cs:oneiric/wordpress-a")
183 urlB := charm.MustParseURL("cs:oneiric/wordpress-b")
184 urls := []*charm.URL{urlA, urlB}
185
186 tests := []struct {
187 urls []*charm.URL
188 data string
189 }{
190 {urls[0:], "charm-revision-0"},
191 {urls[1:], "charm-revision-1"},
192 {urls[0:], "charm-revision-2"},
193 }
194
195 for i, t := range tests {
196 pub, err := s.store.CharmPublisher(t.urls, fmt.Sprintf("digest-%d", i))
197 c.Assert(err, IsNil)
198 c.Assert(pub.Revision(), Equals, i)
199
200 err = pub.Publish(&FakeCharmDir{})
201 c.Assert(err, IsNil)
202 }
203
204 for i, t := range tests {
205 for _, url := range t.urls {
206 url = url.WithRevision(i)
207 info, rc, err := s.store.OpenCharm(url)
208 c.Assert(err, IsNil)
209 data, err := ioutil.ReadAll(rc)
210 cerr := rc.Close()
211 c.Assert(info.Revision(), Equals, i)
212 c.Assert(url.Revision, Equals, i) // Untouched.
213 c.Assert(cerr, IsNil)
214 c.Assert(string(data), Equals, string(t.data))
215 c.Assert(err, IsNil)
216 }
217 }
218
219 info, rc, err := s.store.OpenCharm(urlA.WithRevision(1))
220 c.Assert(err, Equals, store.ErrNotFound)
221 c.Assert(info, IsNil)
222 c.Assert(rc, IsNil)
223}
224
225func (s *StoreSuite) TestLockUpdates(c *C) {
226 urlA := charm.MustParseURL("cs:oneiric/wordpress-a")
227 urlB := charm.MustParseURL("cs:oneiric/wordpress-b")
228 urls := []*charm.URL{urlA, urlB}
229
230 // Lock update of just B to force a partial conflict.
231 lock1, err := s.store.LockUpdates(urls[1:])
232 c.Assert(err, IsNil)
233
234 // Partially conflicts with locked update above.
235 lock2, err := s.store.LockUpdates(urls)
236 c.Check(err, Equals, store.ErrUpdateConflict)
237 c.Check(lock2, IsNil)
238
239 lock1.Unlock()
240
241 // Trying again should work since lock1 was released.
242 lock3, err := s.store.LockUpdates(urls)
243 c.Assert(err, IsNil)
244 lock3.Unlock()
245}
246
247func (s *StoreSuite) TestLockUpdatesExpires(c *C) {
248 urlA := charm.MustParseURL("cs:oneiric/wordpress-a")
249 urlB := charm.MustParseURL("cs:oneiric/wordpress-b")
250 urls := []*charm.URL{urlA, urlB}
251
252 // Initiate an update of B only to force a partial conflict.
253 lock1, err := s.store.LockUpdates(urls[1:])
254 c.Assert(err, IsNil)
255
256 // Hack time to force an expiration.
257 locks := s.Session.DB("juju").C("locks")
258 selector := bson.M{"_id": urlB.String()}
259 update := bson.M{"time": bson.Now().Add(-store.UpdateTimeout - 10e9)}
260 err = locks.Update(selector, update)
261 c.Check(err, IsNil)
262
263 // Works due to expiration of previous lock.
264 lock2, err := s.store.LockUpdates(urls)
265 c.Assert(err, IsNil)
266 defer lock2.Unlock()
267
268 // The expired lock was forcefully killed. Unlocking it must
269 // not interfere with lock2 which is still alive.
270 lock1.Unlock()
271
272 // The above statement was a NOOP and lock2 is still in effect,
273 // so attempting another lock must necessarily fail.
274 lock3, err := s.store.LockUpdates(urls)
275 c.Check(err == store.ErrUpdateConflict, Equals, true)
276 c.Check(lock3, IsNil)
277}
278
279func (s *StoreSuite) TestConflictingUpdate(c *C) {
280 // This test checks that if for whatever reason the locking
281 // safety-net fails, adding two charms in parallel still
282 // results in a sane outcome.
283 url := charm.MustParseURL("cs:oneiric/wordpress")
284 urls := []*charm.URL{url}
285
286 pub1, err := s.store.CharmPublisher(urls, "some-digest")
287 c.Assert(err, IsNil)
288 c.Assert(pub1.Revision(), Equals, 0)
289
290 pub2, err := s.store.CharmPublisher(urls, "some-digest")
291 c.Assert(err, IsNil)
292 c.Assert(pub2.Revision(), Equals, 0)
293
294 // The first publishing attempt should work.
295 err = pub2.Publish(&FakeCharmDir{})
296 c.Assert(err, IsNil)
297
298 // Attempting to finish the second attempt should break,
299 // since it lost the race and the given revision is already
300 // in place.
301 err = pub1.Publish(&FakeCharmDir{})
302 c.Assert(err, Equals, store.ErrUpdateConflict)
303}
304
305func (s *StoreSuite) TestRedundantUpdate(c *C) {
306 urlA := charm.MustParseURL("cs:oneiric/wordpress-a")
307 urlB := charm.MustParseURL("cs:oneiric/wordpress-b")
308 urls := []*charm.URL{urlA, urlB}
309
310 pub, err := s.store.CharmPublisher(urls, "digest-0")
311 c.Assert(err, IsNil)
312 c.Assert(pub.Revision(), Equals, 0)
313 err = pub.Publish(&FakeCharmDir{})
314 c.Assert(err, IsNil)
315
316 // All charms are already on digest-0.
317 pub, err = s.store.CharmPublisher(urls, "digest-0")
318 c.Assert(err, ErrorMatches, "charm is up-to-date")
319 c.Assert(err, Equals, store.ErrRedundantUpdate)
320 c.Assert(pub, IsNil)
321
322 // Now add a second revision just for wordpress-b.
323 pub, err = s.store.CharmPublisher(urls[1:], "digest-1")
324 c.Assert(err, IsNil)
325 c.Assert(pub.Revision(), Equals, 1)
326 err = pub.Publish(&FakeCharmDir{})
327 c.Assert(err, IsNil)
328
329 // Same digest bumps revision because one of them was old.
330 pub, err = s.store.CharmPublisher(urls, "digest-1")
331 c.Assert(err, IsNil)
332 c.Assert(pub.Revision(), Equals, 2)
333 err = pub.Publish(&FakeCharmDir{})
334 c.Assert(err, IsNil)
335}
336
337const fakeRevZeroSha = "319095521ac8a62fa1e8423351973512ecca8928c9f62025e37de57c9ef07a53"
338
339func (s *StoreSuite) TestCharmBundleData(c *C) {
340 url := charm.MustParseURL("cs:oneiric/wordpress")
341 urls := []*charm.URL{url}
342
343 pub, err := s.store.CharmPublisher(urls, "key")
344 c.Assert(err, IsNil)
345 c.Assert(pub.Revision(), Equals, 0)
346
347 err = pub.Publish(&FakeCharmDir{})
348 c.Assert(err, IsNil)
349
350 info, rc, err := s.store.OpenCharm(url)
351 c.Assert(err, IsNil)
352 c.Check(info.BundleSha256(), Equals, fakeRevZeroSha)
353 c.Check(info.BundleSize(), Equals, int64(len("charm-revision-0")))
354 err = rc.Close()
355 c.Check(err, IsNil)
356}
357
358func (s *StoreSuite) TestLogCharmEventWithRevisionedURL(c *C) {
359 url := charm.MustParseURL("cs:oneiric/wordpress-0")
360 event := &store.CharmEvent{
361 Kind: store.EventPublishError,
362 Digest: "some-digest",
363 URLs: []*charm.URL{url},
364 }
365 err := s.store.LogCharmEvent(event)
366 c.Assert(err, ErrorMatches, "LogCharmEvent: got charm URL with revision: cs:oneiric/wordpress-0")
367
368 // This may work in the future, but not now.
369 event, err = s.store.CharmEvent(url, "some-digest")
370 c.Assert(err, ErrorMatches, "CharmEvent: got charm URL with revision: cs:oneiric/wordpress-0")
371 c.Assert(event, IsNil)
372}
373
374func (s *StoreSuite) TestLogCharmEvent(c *C) {
375 url1 := charm.MustParseURL("cs:oneiric/wordpress")
376 url2 := charm.MustParseURL("cs:oneiric/mysql")
377 urls := []*charm.URL{url1, url2}
378
379 event1 := &store.CharmEvent{
380 Kind: store.EventPublished,
381 Revision: 42,
382 Digest: "revKey1",
383 URLs: urls,
384 Warnings: []string{"A warning."},
385 Time: time.Unix(1, 0),
386 }
387 event2 := &store.CharmEvent{
388 Kind: store.EventPublished,
389 Revision: 42,
390 Digest: "revKey2",
391 URLs: urls,
392 Time: time.Unix(1, 0),
393 }
394 event3 := &store.CharmEvent{
395 Kind: store.EventPublishError,
396 Digest: "revKey2",
397 Errors: []string{"An error."},
398 URLs: urls[:1],
399 }
400
401 for _, event := range []*store.CharmEvent{event1, event2, event3} {
402 err := s.store.LogCharmEvent(event)
403 c.Assert(err, IsNil)
404 }
405
406 events := s.Session.DB("juju").C("events")
407 var s1, s2 map[string]interface{}
408
409 err := events.Find(bson.M{"digest": "revKey1"}).One(&s1)
410 c.Assert(err, IsNil)
411 c.Assert(s1["kind"], Equals, int(store.EventPublished))
412 c.Assert(s1["urls"], DeepEquals, []interface{}{"cs:oneiric/wordpress", "cs:oneiric/mysql"})
413 c.Assert(s1["warnings"], DeepEquals, []interface{}{"A warning."})
414 c.Assert(s1["errors"], IsNil)
415 c.Assert(s1["time"], DeepEquals, time.Unix(1, 0))
416
417 err = events.Find(bson.M{"digest": "revKey2", "kind": store.EventPublishError}).One(&s2)
418 c.Assert(err, IsNil)
419 c.Assert(s2["urls"], DeepEquals, []interface{}{"cs:oneiric/wordpress"})
420 c.Assert(s2["warnings"], IsNil)
421 c.Assert(s2["errors"], DeepEquals, []interface{}{"An error."})
422 c.Assert(s2["time"].(time.Time).After(bson.Now().Add(-10e9)), Equals, true)
423
424 // Mongo stores timestamps in milliseconds, so chop
425 // off the extra bits for comparison.
426 event3.Time = time.Unix(0, event3.Time.UnixNano()/1e6*1e6)
427
428 event, err := s.store.CharmEvent(urls[0], "revKey2")
429 c.Assert(err, IsNil)
430 c.Assert(event, DeepEquals, event3)
431
432 event, err = s.store.CharmEvent(urls[1], "revKey1")
433 c.Assert(err, IsNil)
434 c.Assert(event, DeepEquals, event1)
435
436 event, err = s.store.CharmEvent(urls[1], "revKeyX")
437 c.Assert(err, Equals, store.ErrNotFound)
438 c.Assert(event, IsNil)
439}
440
441func (s *StoreSuite) TestCounters(c *C) {
442 sum, err := s.store.SumCounter([]string{"a"}, false)
443 c.Assert(err, IsNil)
444 c.Assert(sum, Equals, int64(0))
445
446 for i := 0; i < 10; i++ {
447 err := s.store.IncCounter([]string{"a", "b", "c"})
448 c.Assert(err, IsNil)
449 }
450 for i := 0; i < 7; i++ {
451 s.store.IncCounter([]string{"a", "b"})
452 c.Assert(err, IsNil)
453 }
454 for i := 0; i < 3; i++ {
455 s.store.IncCounter([]string{"a", "z", "b"})
456 c.Assert(err, IsNil)
457 }
458
459 tests := []struct {
460 key []string
461 prefix bool
462 result int64
463 }{
464 {[]string{"a", "b", "c"}, false, 10},
465 {[]string{"a", "b"}, false, 7},
466 {[]string{"a", "z", "b"}, false, 3},
467 {[]string{"a", "b", "c"}, true, 10},
468 {[]string{"a", "b"}, true, 17},
469 {[]string{"a"}, true, 20},
470 {[]string{"b"}, true, 0},
471 }
472
473 for _, t := range tests {
474 c.Logf("Test: %#v\n", t)
475 sum, err := s.store.SumCounter(t.key, t.prefix)
476 c.Assert(err, IsNil)
477 c.Assert(sum, Equals, t.result)
478 }
479
480 // High-level interface works. Now check that the data is
481 // stored correctly.
482 counters := s.Session.DB("juju").C("stat.counters")
483 docs1, err := counters.Count()
484 c.Assert(err, IsNil)
485 if docs1 != 3 && docs1 != 4 {
486 fmt.Errorf("Expected 3 or 4 docs in counters collection, got %d", docs1)
487 }
488
489 // Hack times so that the next operation adds another document.
490 err = counters.Update(nil, bson.D{{"$set", bson.D{{"t", 1}}}})
491 c.Check(err, IsNil)
492
493 err = s.store.IncCounter([]string{"a", "b", "c"})
494 c.Assert(err, IsNil)
495
496 docs2, err := counters.Count()
497 c.Assert(err, IsNil)
498 c.Assert(docs2, Equals, docs1+1)
499
500 sum, err = s.store.SumCounter([]string{"a", "b", "c"}, false)
501 c.Assert(err, IsNil)
502 c.Assert(sum, Equals, int64(11))
503
504 sum, err = s.store.SumCounter([]string{"a"}, true)
505 c.Assert(err, IsNil)
506 c.Assert(sum, Equals, int64(21))
507}
508
509func (s *StoreSuite) TestCountersReadOnlySum(c *C) {
510 // Summing up an unknown key shouldn't add the key to the database.
511 sum, err := s.store.SumCounter([]string{"a", "b", "c"}, false)
512 c.Assert(err, IsNil)
513 c.Assert(sum, Equals, int64(0))
514
515 tokens := s.Session.DB("juju").C("stat.tokens")
516 n, err := tokens.Count()
517 c.Assert(err, IsNil)
518 c.Assert(n, Equals, 0)
519}
520
521func (s *StoreSuite) TestCountersTokenCaching(c *C) {
522 sum, err := s.store.SumCounter([]string{"a"}, false)
523 c.Assert(err, IsNil)
524 c.Assert(sum, Equals, int64(0))
525
526 const genSize = 512
527
528 // All of these will be cached, as we have two generations
529 // of genSize entries each.
530 for i := 0; i < genSize*2; i++ {
531 err := s.store.IncCounter([]string{strconv.Itoa(i)})
532 c.Assert(err, IsNil)
533 }
534
535 // Now go behind the scenes and corrupt all the tokens.
536 tokens := s.Session.DB("juju").C("stat.tokens")
537 iter := tokens.Find(nil).Iter()
538 var t struct {
539 Id int "_id"
540 Token string "t"
541 }
542 for iter.Next(&t) {
543 err := tokens.UpdateId(t.Id, bson.M{"$set": bson.M{"t": "corrupted" + t.Token}})
544 c.Assert(err, IsNil)
545 }
546 c.Assert(iter.Err(), IsNil)
547
548 // We can consult the counters for the cached entries still.
549 // First, check that the newest generation is good.
550 for i := genSize; i < genSize*2; i++ {
551 n, err := s.store.SumCounter([]string{strconv.Itoa(i)}, false)
552 c.Assert(err, IsNil)
553 c.Assert(n, Equals, int64(1))
554 }
555
556 // Now, we can still access a single entry of the older generation,
557 // but this will cause the generations to flip and thus the rest
558 // of the old generation will go away as the top half of the
559 // entries is turned into the old generation.
560 n, err := s.store.SumCounter([]string{"0"}, false)
561 c.Assert(err, IsNil)
562 c.Assert(n, Equals, int64(1))
563
564 // Now we've lost access to the rest of the old generation.
565 for i := 1; i < genSize; i++ {
566 n, err := s.store.SumCounter([]string{strconv.Itoa(i)}, false)
567 c.Assert(err, IsNil)
568 c.Assert(n, Equals, int64(0))
569 }
570
571 // But we still have all of the top half available since it was
572 // moved into the old generation.
573 for i := genSize; i < genSize*2; i++ {
574 n, err := s.store.SumCounter([]string{strconv.Itoa(i)}, false)
575 c.Assert(err, IsNil)
576 c.Assert(n, Equals, int64(1))
577 }
578}
579
580func (s *StoreSuite) TestCounterTokenUniqueness(c *C) {
581 var wg0, wg1 sync.WaitGroup
582 wg0.Add(10)
583 wg1.Add(10)
584 for i := 0; i < 10; i++ {
585 go func() {
586 wg0.Done()
587 wg0.Wait()
588 defer wg1.Done()
589 err := s.store.IncCounter([]string{"a"})
590 c.Check(err, IsNil)
591 }()
592 }
593 wg1.Wait()
594
595 sum, err := s.store.SumCounter([]string{"a"}, false)
596 c.Assert(err, IsNil)
597 c.Assert(sum, Equals, int64(10))
598}
599
600func (s *TrivialSuite) TestEventString(c *C) {
601 c.Assert(store.EventPublished, Matches, "published")
602 c.Assert(store.EventPublishError, Matches, "publish-error")
603 for kind := store.CharmEventKind(1); kind < store.EventKindCount; kind++ {
604 // This guarantees the switch in String is properly
605 // updated with new event kinds.
606 c.Assert(kind.String(), Matches, "[a-z-]+")
607 }
608}

Subscribers

People subscribed via source and target branches