Merge lp:~pedronis/ubuntu-push/fix-split-reuse into lp:ubuntu-push

Proposed by Samuele Pedroni
Status: Merged
Approved by: Samuele Pedroni
Approved revision: 76
Merged at revision: 75
Proposed branch: lp:~pedronis/ubuntu-push/fix-split-reuse
Merge into: lp:ubuntu-push
Diff against target: 151 lines (+62/-9)
4 files modified
protocol/messages.go (+6/-0)
protocol/messages_test.go (+3/-0)
server/broker/exchanges.go (+1/-0)
server/broker/exchanges_test.go (+52/-9)
To merge this branch: bzr merge lp:~pedronis/ubuntu-push/fix-split-reuse
Reviewer Review Type Date Requested Status
Nicola Larosa (community) Approve
Review via email: mp+208414@code.launchpad.net

Commit message

* fix the fact that exchanges were reusing broadcast messages but the split state of those wasn't reset
* fix naming in tests that was reversed

Description of the change

* fix the fact that exchanges were reusing broadcast messages but the split state of those wasn't reset

* fix naming in tests that was reversed

To post a comment you must log in.
Revision history for this message
Nicola Larosa (teknico) wrote :

LGTM

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'protocol/messages.go'
--- protocol/messages.go 2014-02-24 15:54:37 +0000
+++ protocol/messages.go 2014-02-26 16:06:13 +0000
@@ -93,6 +93,12 @@
93 return true93 return true
94}94}
9595
96// Reset resets the splitting state if the message storage is to be
97// reused.
98func (b *BroadcastMsg) Reset() {
99 b.splitting = 0
100}
101
96// NOTIFICATIONS message102// NOTIFICATIONS message
97type NotificationsMsg struct {103type NotificationsMsg struct {
98 Type string `json:"T"`104 Type string `json:"T"`
99105
=== modified file 'protocol/messages_test.go'
--- protocol/messages_test.go 2014-02-10 22:53:00 +0000
+++ protocol/messages_test.go 2014-02-26 16:06:13 +0000
@@ -99,4 +99,7 @@
99 n3 := len(b.Payloads)99 n3 := len(b.Payloads)
100 c.Check(b.TopLevel, Equals, int64(n))100 c.Check(b.TopLevel, Equals, int64(n))
101 c.Check(n1+n2+n3, Equals, n)101 c.Check(n1+n2+n3, Equals, n)
102 // reset
103 b.Reset()
104 c.Check(b.splitting, Equals, 0)
102}105}
103106
=== modified file 'server/broker/exchanges.go'
--- server/broker/exchanges.go 2014-02-10 23:19:08 +0000
+++ server/broker/exchanges.go 2014-02-26 16:06:13 +0000
@@ -61,6 +61,7 @@
61// Prepare session for a BROADCAST.61// Prepare session for a BROADCAST.
62func (sbe *BroadcastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) {62func (sbe *BroadcastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) {
63 scratchArea := sess.ExchangeScratchArea()63 scratchArea := sess.ExchangeScratchArea()
64 scratchArea.broadcastMsg.Reset()
64 scratchArea.broadcastMsg.Type = "broadcast"65 scratchArea.broadcastMsg.Type = "broadcast"
65 clientLevel := sess.Levels()[sbe.ChanId]66 clientLevel := sess.Levels()[sbe.ChanId]
66 payloads := filterByLevel(clientLevel, sbe.TopLevel, sbe.NotificationPayloads)67 payloads := filterByLevel(clientLevel, sbe.TopLevel, sbe.NotificationPayloads)
6768
=== modified file 'server/broker/exchanges_test.go'
--- server/broker/exchanges_test.go 2014-02-10 23:19:08 +0000
+++ server/broker/exchanges_test.go 2014-02-26 16:06:13 +0000
@@ -18,6 +18,8 @@
1818
19import (19import (
20 "encoding/json"20 "encoding/json"
21 "fmt"
22 "strings"
21 stdtesting "testing"23 stdtesting "testing"
2224
23 . "launchpad.net/gocheck"25 . "launchpad.net/gocheck"
@@ -45,19 +47,60 @@
45 json.RawMessage(`{"a":"y"}`),47 json.RawMessage(`{"a":"y"}`),
46 },48 },
47 }49 }
48 inMsg, outMsg, err := exchg.Prepare(sess)50 outMsg, inMsg, err := exchg.Prepare(sess)
49 c.Assert(err, IsNil)51 c.Assert(err, IsNil)
50 // check52 // check
51 marshalled, err := json.Marshal(inMsg)53 marshalled, err := json.Marshal(outMsg)
52 c.Assert(err, IsNil)54 c.Assert(err, IsNil)
53 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"x"},{"a":"y"}]}`)55 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"x"},{"a":"y"}]}`)
54 err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg)56 err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg)
55 c.Assert(err, IsNil)57 c.Assert(err, IsNil)
56 err = exchg.Acked(sess, true)58 err = exchg.Acked(sess, true)
57 c.Assert(err, IsNil)59 c.Assert(err, IsNil)
58 c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(3))60 c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(3))
59}61}
6062
63func (s *exchangesSuite) TestBroadcastExchangeReuseVsSplit(c *C) {
64 sess := &testing.TestBrokerSession{
65 LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),
66 }
67 payloadFmt := fmt.Sprintf(`{"b":%%d,"bloat":"%s"}`, strings.Repeat("x", 1024*2))
68 needsSplitting := make([]json.RawMessage, 32)
69 for i := 0; i < 32; i++ {
70 needsSplitting[i] = json.RawMessage(fmt.Sprintf(payloadFmt, i))
71 }
72
73 topLevel := int64(len(needsSplitting))
74 exchg := &broker.BroadcastExchange{
75 ChanId: store.SystemInternalChannelId,
76 TopLevel: topLevel,
77 NotificationPayloads: needsSplitting,
78 }
79 outMsg, _, err := exchg.Prepare(sess)
80 c.Assert(err, IsNil)
81 parts := 0
82 for {
83 done := outMsg.Split()
84 parts++
85 if done {
86 break
87 }
88 }
89 c.Assert(parts, Equals, 2)
90 exchg = &broker.BroadcastExchange{
91 ChanId: store.SystemInternalChannelId,
92 TopLevel: topLevel + 2,
93 NotificationPayloads: []json.RawMessage{
94 json.RawMessage(`{"a":"x"}`),
95 json.RawMessage(`{"a":"y"}`),
96 },
97 }
98 outMsg, _, err = exchg.Prepare(sess)
99 c.Assert(err, IsNil)
100 done := outMsg.Split() // shouldn't panic
101 c.Check(done, Equals, true)
102}
103
61func (s *exchangesSuite) TestBroadcastExchangeAckMismatch(c *C) {104func (s *exchangesSuite) TestBroadcastExchangeAckMismatch(c *C) {
62 sess := &testing.TestBrokerSession{105 sess := &testing.TestBrokerSession{
63 LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),106 LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),
@@ -69,13 +112,13 @@
69 json.RawMessage(`{"a":"y"}`),112 json.RawMessage(`{"a":"y"}`),
70 },113 },
71 }114 }
72 inMsg, outMsg, err := exchg.Prepare(sess)115 outMsg, inMsg, err := exchg.Prepare(sess)
73 c.Assert(err, IsNil)116 c.Assert(err, IsNil)
74 // check117 // check
75 marshalled, err := json.Marshal(inMsg)118 marshalled, err := json.Marshal(outMsg)
76 c.Assert(err, IsNil)119 c.Assert(err, IsNil)
77 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`)120 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`)
78 err = json.Unmarshal([]byte(`{}`), outMsg)121 err = json.Unmarshal([]byte(`{}`), inMsg)
79 c.Assert(err, IsNil)122 c.Assert(err, IsNil)
80 err = exchg.Acked(sess, true)123 err = exchg.Acked(sess, true)
81 c.Assert(err, Not(IsNil))124 c.Assert(err, Not(IsNil))
@@ -96,13 +139,13 @@
96 json.RawMessage(`{"a":"y"}`),139 json.RawMessage(`{"a":"y"}`),
97 },140 },
98 }141 }
99 inMsg, outMsg, err := exchg.Prepare(sess)142 outMsg, inMsg, err := exchg.Prepare(sess)
100 c.Assert(err, IsNil)143 c.Assert(err, IsNil)
101 // check144 // check
102 marshalled, err := json.Marshal(inMsg)145 marshalled, err := json.Marshal(outMsg)
103 c.Assert(err, IsNil)146 c.Assert(err, IsNil)
104 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`)147 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`)
105 err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg)148 err = json.Unmarshal([]byte(`{"T":"ack"}`), inMsg)
106 c.Assert(err, IsNil)149 c.Assert(err, IsNil)
107 err = exchg.Acked(sess, true)150 err = exchg.Acked(sess, true)
108 c.Assert(err, IsNil)151 c.Assert(err, IsNil)

Subscribers

People subscribed via source and target branches