Merge lp:~pedronis/go-dbus/atomic-write-per-conn into lp:go-dbus/v1

Proposed by Samuele Pedroni
Status: Merged
Merged at revision: 133
Proposed branch: lp:~pedronis/go-dbus/atomic-write-per-conn
Merge into: lp:go-dbus/v1
Diff against target: 98 lines (+13/-15)
3 files modified
auth.go (+2/-2)
dbus.go (+10/-5)
message.go (+1/-8)
To merge this branch: bzr merge lp:~pedronis/go-dbus/atomic-write-per-conn
Reviewer Review Type Date Requested Status
John Lenton Approve
Review via email: mp+242257@code.launchpad.net

Commit message

make message writes on a connection atomic (not globally)

Description of the change

make message writes on a connection atomic (not globally)

To post a comment you must log in.
133. By Samuele Pedroni

fix double negations in comment

134. By Samuele Pedroni

don't lock in WriteTo itself but add doc comment

135. By Samuele Pedroni

make message writes on a connection atomic

Revision history for this message
John Lenton (chipaca) wrote :
Download full text (4.1 KiB)

Thank you.

Review: approve
On 19 Nov 2014 19:15, "Samuele Pedroni" <email address hidden>
wrote:

> Samuele Pedroni has proposed merging
> lp:~pedronis/go-dbus/atomic-write-per-conn into lp:go-dbus/v1.
>
> Commit message:
> make message writes on a connection atomic (not globally)
>
> Requested reviews:
> Go D-Bus Hackers (go-dbus)
>
> For more details, see:
>
> https://code.launchpad.net/~pedronis/go-dbus/atomic-write-per-conn/+merge/242257
>
> make message writes on a connection atomic (not globally)
> --
> Your team Go D-Bus Hackers is requested to review the proposed merge of
> lp:~pedronis/go-dbus/atomic-write-per-conn into lp:go-dbus/v1.
>
> === modified file 'auth.go'
> --- auth.go 2014-10-22 21:48:27 +0000
> +++ auth.go 2014-11-19 19:15:16 +0000
> @@ -124,7 +124,7 @@
> inStream := bufio.NewReader(conn)
> send := func(command ...[]byte) ([][]byte, error) {
> msg := bytes.Join(command, []byte(" "))
> - // writing at this point doesn't not need to be synced as
> the connection
> + // writing at this point does not need to be synced as the
> connection
> // is not shared at this point.
> _, err := conn.Write(append(msg, []byte("\r\n")...))
> if err != nil {
> @@ -182,7 +182,7 @@
> return errors.New("Could not authenticate with any
> mechanism")
> }
> // XXX: UNIX FD negotiation would go here.
> - // writing at this point doesn't not need to be synced as the
> connection
> + // writing at this point does not need to be synced as the
> connection
> // is not shared at this point.
> if _, err := conn.Write([]byte("BEGIN\r\n")); err != nil {
> return err
>
> === modified file 'dbus.go'
> --- dbus.go 2014-11-13 10:14:09 +0000
> +++ dbus.go 2014-11-19 19:15:16 +0000
> @@ -37,6 +37,7 @@
> // The unique name of this connection on the message bus.
> UniqueName string
> conn net.Conn
> + writeLock sync.Mutex
> busProxy BusDaemon
> lastSerial uint32
>
> @@ -251,12 +252,16 @@
> return atomic.AddUint32(&p.lastSerial, 1)
> }
>
> +func (p *Connection) atomicWriteMessage(msg *Message) error {
> + p.writeLock.Lock()
> + defer p.writeLock.Unlock()
> + _, err := msg.WriteTo(p.conn)
> + return err
> +}
> +
> func (p *Connection) Send(msg *Message) error {
> msg.setSerial(p.nextSerial())
> - if _, err := msg.WriteTo(p.conn); err != nil {
> - return err
> - }
> - return nil
> + return p.atomicWriteMessage(msg)
> }
>
> func (p *Connection) SendWithReply(msg *Message) (*Message, error) {
> @@ -272,7 +277,7 @@
> p.methodCallReplies[serial] = replyChan
> p.handlerMutex.Unlock()
>
> - if _, err := msg.WriteTo(p.conn); err != nil {
> + if err := p.atomicWriteMessage(msg); err != nil {
> p.handlerMutex.Lock()
> delete(p.methodCallReplies, serial)
> p.handlerMutex.Unlock()
>
> === modified file 'message.go'
> --- message.go 2014-10-22 16:08:20 +0000
...

Read more...

Revision history for this message
John Lenton (chipaca) wrote :
Download full text (4.3 KiB)

review approve
On 20 Nov 2014 09:01, "John Lenton" <email address hidden> wrote:

> Thank you.
>
> Review: approve
> On 19 Nov 2014 19:15, "Samuele Pedroni" <email address hidden>
> wrote:
>
>> Samuele Pedroni has proposed merging
>> lp:~pedronis/go-dbus/atomic-write-per-conn into lp:go-dbus/v1.
>>
>> Commit message:
>> make message writes on a connection atomic (not globally)
>>
>> Requested reviews:
>> Go D-Bus Hackers (go-dbus)
>>
>> For more details, see:
>>
>> https://code.launchpad.net/~pedronis/go-dbus/atomic-write-per-conn/+merge/242257
>>
>> make message writes on a connection atomic (not globally)
>> --
>> Your team Go D-Bus Hackers is requested to review the proposed merge of
>> lp:~pedronis/go-dbus/atomic-write-per-conn into lp:go-dbus/v1.
>>
>> === modified file 'auth.go'
>> --- auth.go 2014-10-22 21:48:27 +0000
>> +++ auth.go 2014-11-19 19:15:16 +0000
>> @@ -124,7 +124,7 @@
>> inStream := bufio.NewReader(conn)
>> send := func(command ...[]byte) ([][]byte, error) {
>> msg := bytes.Join(command, []byte(" "))
>> - // writing at this point doesn't not need to be synced as
>> the connection
>> + // writing at this point does not need to be synced as
>> the connection
>> // is not shared at this point.
>> _, err := conn.Write(append(msg, []byte("\r\n")...))
>> if err != nil {
>> @@ -182,7 +182,7 @@
>> return errors.New("Could not authenticate with any
>> mechanism")
>> }
>> // XXX: UNIX FD negotiation would go here.
>> - // writing at this point doesn't not need to be synced as the
>> connection
>> + // writing at this point does not need to be synced as the
>> connection
>> // is not shared at this point.
>> if _, err := conn.Write([]byte("BEGIN\r\n")); err != nil {
>> return err
>>
>> === modified file 'dbus.go'
>> --- dbus.go 2014-11-13 10:14:09 +0000
>> +++ dbus.go 2014-11-19 19:15:16 +0000
>> @@ -37,6 +37,7 @@
>> // The unique name of this connection on the message bus.
>> UniqueName string
>> conn net.Conn
>> + writeLock sync.Mutex
>> busProxy BusDaemon
>> lastSerial uint32
>>
>> @@ -251,12 +252,16 @@
>> return atomic.AddUint32(&p.lastSerial, 1)
>> }
>>
>> +func (p *Connection) atomicWriteMessage(msg *Message) error {
>> + p.writeLock.Lock()
>> + defer p.writeLock.Unlock()
>> + _, err := msg.WriteTo(p.conn)
>> + return err
>> +}
>> +
>> func (p *Connection) Send(msg *Message) error {
>> msg.setSerial(p.nextSerial())
>> - if _, err := msg.WriteTo(p.conn); err != nil {
>> - return err
>> - }
>> - return nil
>> + return p.atomicWriteMessage(msg)
>> }
>>
>> func (p *Connection) SendWithReply(msg *Message) (*Message, error) {
>> @@ -272,7 +277,7 @@
>> p.methodCallReplies[serial] = replyChan
>> p.handlerMutex.Unlock()
>>
>> - if _, err := msg.WriteTo(p.conn); err != nil {
>> + if err := p.atomicWriteMessage(msg); err != nil {
>> p.handlerMutex.L...

Read more...

Revision history for this message
John Lenton (chipaca) wrote :

Sigh. Email interface from mobile seems iffy...

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'auth.go'
--- auth.go 2014-10-22 21:48:27 +0000
+++ auth.go 2014-11-19 19:15:16 +0000
@@ -124,7 +124,7 @@
124 inStream := bufio.NewReader(conn)124 inStream := bufio.NewReader(conn)
125 send := func(command ...[]byte) ([][]byte, error) {125 send := func(command ...[]byte) ([][]byte, error) {
126 msg := bytes.Join(command, []byte(" "))126 msg := bytes.Join(command, []byte(" "))
127 // writing at this point doesn't not need to be synced as the connection127 // writing at this point does not need to be synced as the connection
128 // is not shared at this point.128 // is not shared at this point.
129 _, err := conn.Write(append(msg, []byte("\r\n")...))129 _, err := conn.Write(append(msg, []byte("\r\n")...))
130 if err != nil {130 if err != nil {
@@ -182,7 +182,7 @@
182 return errors.New("Could not authenticate with any mechanism")182 return errors.New("Could not authenticate with any mechanism")
183 }183 }
184 // XXX: UNIX FD negotiation would go here.184 // XXX: UNIX FD negotiation would go here.
185 // writing at this point doesn't not need to be synced as the connection185 // writing at this point does not need to be synced as the connection
186 // is not shared at this point.186 // is not shared at this point.
187 if _, err := conn.Write([]byte("BEGIN\r\n")); err != nil {187 if _, err := conn.Write([]byte("BEGIN\r\n")); err != nil {
188 return err188 return err
189189
=== modified file 'dbus.go'
--- dbus.go 2014-11-13 10:14:09 +0000
+++ dbus.go 2014-11-19 19:15:16 +0000
@@ -37,6 +37,7 @@
37 // The unique name of this connection on the message bus.37 // The unique name of this connection on the message bus.
38 UniqueName string38 UniqueName string
39 conn net.Conn39 conn net.Conn
40 writeLock sync.Mutex
40 busProxy BusDaemon41 busProxy BusDaemon
41 lastSerial uint3242 lastSerial uint32
4243
@@ -251,12 +252,16 @@
251 return atomic.AddUint32(&p.lastSerial, 1)252 return atomic.AddUint32(&p.lastSerial, 1)
252}253}
253254
255func (p *Connection) atomicWriteMessage(msg *Message) error {
256 p.writeLock.Lock()
257 defer p.writeLock.Unlock()
258 _, err := msg.WriteTo(p.conn)
259 return err
260}
261
254func (p *Connection) Send(msg *Message) error {262func (p *Connection) Send(msg *Message) error {
255 msg.setSerial(p.nextSerial())263 msg.setSerial(p.nextSerial())
256 if _, err := msg.WriteTo(p.conn); err != nil {264 return p.atomicWriteMessage(msg)
257 return err
258 }
259 return nil
260}265}
261266
262func (p *Connection) SendWithReply(msg *Message) (*Message, error) {267func (p *Connection) SendWithReply(msg *Message) (*Message, error) {
@@ -272,7 +277,7 @@
272 p.methodCallReplies[serial] = replyChan277 p.methodCallReplies[serial] = replyChan
273 p.handlerMutex.Unlock()278 p.handlerMutex.Unlock()
274279
275 if _, err := msg.WriteTo(p.conn); err != nil {280 if err := p.atomicWriteMessage(msg); err != nil {
276 p.handlerMutex.Lock()281 p.handlerMutex.Lock()
277 delete(p.methodCallReplies, serial)282 delete(p.methodCallReplies, serial)
278 p.handlerMutex.Unlock()283 p.handlerMutex.Unlock()
279284
=== modified file 'message.go'
--- message.go 2014-10-22 16:08:20 +0000
+++ message.go 2014-11-19 19:15:16 +0000
@@ -4,7 +4,6 @@
4 "encoding/binary"4 "encoding/binary"
5 "errors"5 "errors"
6 "io"6 "io"
7 "sync"
8)7)
98
10// See the D-Bus tutorial for information about message types.9// See the D-Bus tutorial for information about message types.
@@ -27,8 +26,6 @@
27 TypeError: "error",26 TypeError: "error",
28}27}
2928
30var writeMutex sync.Mutex
31
32func (t MessageType) String() string { return messageTypeString[t] }29func (t MessageType) String() string { return messageTypeString[t] }
3330
34type MessageFlag uint831type MessageFlag uint8
@@ -320,12 +317,8 @@
320 return msg, nil317 return msg, nil
321}318}
322319
323// WriteTo serialises the message and writes it to the given writer.320// WriteTo serialises the message and writes it to the given writer. Not atomic!
324func (p *Message) WriteTo(w io.Writer) (int64, error) {321func (p *Message) WriteTo(w io.Writer) (int64, error) {
325 // message writing needs to be sequential
326 writeMutex.Lock()
327 defer writeMutex.Unlock()
328
329 fields := make([]headerField, 0, 10)322 fields := make([]headerField, 0, 10)
330 if p.Path != "" {323 if p.Path != "" {
331 fields = append(fields, headerField{1, Variant{p.Path}})324 fields = append(fields, headerField{1, Variant{p.Path}})

Subscribers

People subscribed via source and target branches