Merge lp:~pedronis/ubuntu-push/fix-split-reuse into lp:ubuntu-push

Proposed by Samuele Pedroni
Status: Merged
Approved by: Samuele Pedroni
Approved revision: 76
Merged at revision: 75
Proposed branch: lp:~pedronis/ubuntu-push/fix-split-reuse
Merge into: lp:ubuntu-push
Diff against target: 151 lines (+62/-9)
4 files modified
protocol/messages.go (+6/-0)
protocol/messages_test.go (+3/-0)
server/broker/exchanges.go (+1/-0)
server/broker/exchanges_test.go (+52/-9)
To merge this branch: bzr merge lp:~pedronis/ubuntu-push/fix-split-reuse
Reviewer Review Type Date Requested Status
Nicola Larosa (community) Approve
Review via email: mp+208414@code.launchpad.net

Commit message

* fix the fact that exchanges were reusing broadcast messages but the split state of those wasn't reset
* fix naming in tests that was reversed

Description of the change

* fix the fact that exchanges were reusing broadcast messages but the split state of those wasn't reset

* fix naming in tests that was reversed

To post a comment you must log in.
Revision history for this message
Nicola Larosa (teknico) wrote :

LGTM

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'protocol/messages.go'
2--- protocol/messages.go 2014-02-24 15:54:37 +0000
3+++ protocol/messages.go 2014-02-26 16:06:13 +0000
4@@ -93,6 +93,12 @@
5 return true
6 }
7
8+// Reset resets the splitting state if the message storage is to be
9+// reused.
10+func (b *BroadcastMsg) Reset() {
11+ b.splitting = 0
12+}
13+
14 // NOTIFICATIONS message
15 type NotificationsMsg struct {
16 Type string `json:"T"`
17
18=== modified file 'protocol/messages_test.go'
19--- protocol/messages_test.go 2014-02-10 22:53:00 +0000
20+++ protocol/messages_test.go 2014-02-26 16:06:13 +0000
21@@ -99,4 +99,7 @@
22 n3 := len(b.Payloads)
23 c.Check(b.TopLevel, Equals, int64(n))
24 c.Check(n1+n2+n3, Equals, n)
25+ // reset
26+ b.Reset()
27+ c.Check(b.splitting, Equals, 0)
28 }
29
30=== modified file 'server/broker/exchanges.go'
31--- server/broker/exchanges.go 2014-02-10 23:19:08 +0000
32+++ server/broker/exchanges.go 2014-02-26 16:06:13 +0000
33@@ -61,6 +61,7 @@
34 // Prepare session for a BROADCAST.
35 func (sbe *BroadcastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) {
36 scratchArea := sess.ExchangeScratchArea()
37+ scratchArea.broadcastMsg.Reset()
38 scratchArea.broadcastMsg.Type = "broadcast"
39 clientLevel := sess.Levels()[sbe.ChanId]
40 payloads := filterByLevel(clientLevel, sbe.TopLevel, sbe.NotificationPayloads)
41
42=== modified file 'server/broker/exchanges_test.go'
43--- server/broker/exchanges_test.go 2014-02-10 23:19:08 +0000
44+++ server/broker/exchanges_test.go 2014-02-26 16:06:13 +0000
45@@ -18,6 +18,8 @@
46
47 import (
48 "encoding/json"
49+ "fmt"
50+ "strings"
51 stdtesting "testing"
52
53 . "launchpad.net/gocheck"
54@@ -45,19 +47,60 @@
55 json.RawMessage(`{"a":"y"}`),
56 },
57 }
58- inMsg, outMsg, err := exchg.Prepare(sess)
59+ outMsg, inMsg, err := exchg.Prepare(sess)
60 c.Assert(err, IsNil)
61 // check
62- marshalled, err := json.Marshal(inMsg)
63+ marshalled, err := json.Marshal(outMsg)
64 c.Assert(err, IsNil)
65 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"x"},{"a":"y"}]}`)
66- err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg)
67+ err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg)
68 c.Assert(err, IsNil)
69 err = exchg.Acked(sess, true)
70 c.Assert(err, IsNil)
71 c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(3))
72 }
73
74+func (s *exchangesSuite) TestBroadcastExchangeReuseVsSplit(c *C) {
75+ sess := &testing.TestBrokerSession{
76+ LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),
77+ }
78+ payloadFmt := fmt.Sprintf(`{"b":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2))
79+ needsSplitting := make([]json.RawMessage, 32)
80+ for i := 0; i < 32; i++ {
81+ needsSplitting[i] = json.RawMessage(fmt.Sprintf(payloadFmt, i))
82+ }
83+
84+ topLevel := int64(len(needsSplitting))
85+ exchg := &broker.BroadcastExchange{
86+ ChanId: store.SystemInternalChannelId,
87+ TopLevel: topLevel,
88+ NotificationPayloads: needsSplitting,
89+ }
90+ outMsg, _, err := exchg.Prepare(sess)
91+ c.Assert(err, IsNil)
92+ parts := 0
93+ for {
94+ done := outMsg.Split()
95+ parts++
96+ if done {
97+ break
98+ }
99+ }
100+ c.Assert(parts, Equals, 2)
101+ exchg = &broker.BroadcastExchange{
102+ ChanId: store.SystemInternalChannelId,
103+ TopLevel: topLevel + 2,
104+ NotificationPayloads: []json.RawMessage{
105+ json.RawMessage(`{"a":"x"}`),
106+ json.RawMessage(`{"a":"y"}`),
107+ },
108+ }
109+ outMsg, _, err = exchg.Prepare(sess)
110+ c.Assert(err, IsNil)
111+ done := outMsg.Split() // shouldn't panic
112+ c.Check(done, Equals, true)
113+}
114+
115 func (s *exchangesSuite) TestBroadcastExchangeAckMismatch(c *C) {
116 sess := &testing.TestBrokerSession{
117 LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),
118@@ -69,13 +112,13 @@
119 json.RawMessage(`{"a":"y"}`),
120 },
121 }
122- inMsg, outMsg, err := exchg.Prepare(sess)
123+ outMsg, inMsg, err := exchg.Prepare(sess)
124 c.Assert(err, IsNil)
125 // check
126- marshalled, err := json.Marshal(inMsg)
127+ marshalled, err := json.Marshal(outMsg)
128 c.Assert(err, IsNil)
129 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`)
130- err = json.Unmarshal([]byte(`{}`), outMsg)
131+ err = json.Unmarshal([]byte(`{}`), inMsg)
132 c.Assert(err, IsNil)
133 err = exchg.Acked(sess, true)
134 c.Assert(err, Not(IsNil))
135@@ -96,13 +139,13 @@
136 json.RawMessage(`{"a":"y"}`),
137 },
138 }
139- inMsg, outMsg, err := exchg.Prepare(sess)
140+ outMsg, inMsg, err := exchg.Prepare(sess)
141 c.Assert(err, IsNil)
142 // check
143- marshalled, err := json.Marshal(inMsg)
144+ marshalled, err := json.Marshal(outMsg)
145 c.Assert(err, IsNil)
146 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`)
147- err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg)
148+ err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg)
149 c.Assert(err, IsNil)
150 err = exchg.Acked(sess, true)
151 c.Assert(err, IsNil)

Subscribers

People subscribed via source and target branches