Merge lp:~codehelp/lava-dispatcher/multinode into lp:lava-dispatcher/multinode

Proposed by Neil Williams
Status: Merged
Approved by: Neil Williams
Approved revision: 684
Merged at revision: 659
Proposed branch: lp:~codehelp/lava-dispatcher/multinode
Merge into: lp:lava-dispatcher/multinode
Diff against target: 121 lines (+27/-22)
1 file modified
lava/dispatcher/group.py (+27/-22)
To merge this branch: bzr merge lp:~codehelp/lava-dispatcher/multinode
Reviewer Review Type Date Requested Status
Fu Wei Approve
Linaro Automation & Validation Pending
Review via email: mp+174612@code.launchpad.net

Description of the change

This is the reworking of the group key=value pair handling done at Connect and which complements Fu's change for wait_all.

This results in:

<LAVA_WAIT_ALL_DEBUG lava_multi_node_send preparing Sun Jul 14 13:00:04 UTC 2013>
<LAVA_WAIT_ALL_DEBUG _lava_multi_node_send started Sun Jul 14 13:00:04 UTC 2013>
<LAVA_MULTI_NODE> <LAVA_WAIT_ALL get_source client>
<LAVA_DISPATCHER>2013-07-14 02:00:04 PM DEBUG: Received Multi_Node API <LAVA_WAIT_ALL>
<LAVA_DISPATCHER>2013-07-14 02:00:04 PM DEBUG: Handling signal <LAVA_WAIT_ALL get_source>
<LAVA_DISPATCHER>2013-07-14 02:00:04 PM DEBUG: transport handler for NodeDispatcher {"role": "client", "request": "lava_wait_all", "messageID": "get_source"}
<LAVA_DISPATCHER>2013-07-14 02:00:04 PM INFO: requesting lava_wait_all {"request": "lava_wait_all", "role": "client", "messageID": "get_source"}
<LAVA_DISPATCHER>2013-07-14 02:00:04 PM DEBUG: sending Message {"client_name": "kvm01", "hostname": "sylvester", "request": "lava_wait_all", "group_name": "group01", "host": "sylvester", "role": "client", "messageID": "get_source", "port": 3079}
<LAVA_DISPATCHER>2013-07-14 02:00:04 PM DEBUG: polling {"client_name": "kvm01", "hostname": "sylvester", "group_name": "group01", "host": "sylvester", "role": "client", "port": 3079}
<LAVA_DISPATCHER>2013-07-14 02:00:04 PM DEBUG: socket created for host:sylvester port:3079
<LAVA_DISPATCHER>2013-07-14 02:00:04 PM DEBUG: read message: {"client_name": "kvm01", "hostname": "sylvester", "request": "lava_wait_all", "group_name": "group01", "host": "sylvester", "role": "client", "messageID": "get_source", "port": 3079}
<LAVA_DISPATCHER>2013-07-14 02:00:04 PM INFO: Response: ack
<LAVA_DISPATCHER>2013-07-14 02:00:04 PM DEBUG: Node transport replied with {u'kvm01': {u'source': u'kvm01', u'role': u'client'}, u'kvm02': {u'source': u'kvm02', u'role': u'client'}}
<LAVA_DISPATCHER>2013-07-14 02:00:04 PM DEBUG: sendline : <LAVA_WAIT_ALL_COMPLETE kvm01:source=kvm01 kvm01:role=client kvm02:source=kvm02 kvm02:role=client>
<LAVA_DISPATCHER>2013-07-14 02:00:04 PM DEBUG: send : <LAVA_WAIT_ALL_COMPLETE kvm01:source=kvm01 kvm01:role=client kvm02:source=kvm02 kvm02:role=client>
<LAVA_DISPATCHER>2013-07-14 02:00:09 PM DEBUG: send :

The output sent from the GroupDispatcher is:
{
    "kvm01": {
        "source": "kvm01",
        "role": "client"
    },
    "kvm02": {
        "source": "kvm02",
        "role": "client"
    }
}

This converts to a message of:
kvm01:source=kvm01 kvm01:role=client kvm02:source=kvm02 kvm02:role=client

To post a comment you must log in.
Revision history for this message
Neil Williams (codehelp) wrote :

There's a parallel change in signals/__init__.py which goes along with this change:

=== modified file 'lava_dispatcher/signals/__init__.py'
--- lava_dispatcher/signals/__init__.py 2013-07-10 10:36:36 +0000
+++ lava_dispatcher/signals/__init__.py 2013-07-16 08:14:35 +0000
@@ -207,8 +207,11 @@
         reply = self.context.transport(json.dumps(msg))
         logging.debug("Node transport replied with %s" % reply)
         message_str = ""
- for key, value in reply[0].items():
- message_str += " %s=%s" % (key, value)
+ for target, messages in reply.items():
+ for key, value in messages.items():
+ message_str += " %s:%s=%s" % (target, key, value)
+# for key, value in reply[0].items():
+# message_str += " %s=%s" % (key, value)
         self.connection.sendline("<LAVA_WAIT_COMPLETE%s>" % message_str)

     def _on_WAIT_ALL(self, message_id, role=None):

With this change, lava-wait gives this kind of output:

<LAVA_DISPATCHER>2013-07-16 08:37:18 AM DEBUG: Node transport replied with
{
    "panda21": {
        "ip": "192.168.23.11",
        "hostname": "panda21.localdomain"
    },
    "multinode-arndale01": {
        "ip": "192.168.1.79"
    },
    "panda19": {
        "ip": "192.168.127.216",
        "hostname": "panda19.localdomain"
    }
}

<LAVA_DISPATCHER>2013-07-16 08:37:18 AM DEBUG: send : <LAVA_WAIT_COMPLETE
panda21:ip=192.168.23.11
panda21:hostname=panda21.localdomain

multinode-arndale01:ip=192.168.1.79

panda19:ip=192.168.127.216
panda19:hostname=panda19.localdomain
>

Revision history for this message
Neil Williams (codehelp) wrote :

(I added the formatting of the output manually, the actual output in each case is on a single line)

Revision history for this message
Neil Williams (codehelp) wrote :

(for whatever reason, the arndale does not currently support hostname -f, so the fqdn is not available and the code omits a key which has no value.)

Revision history for this message
Fu Wei (fu-wei) wrote :

I have tested it , it works fine. Good patch!

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lava/dispatcher/group.py'
2--- lava/dispatcher/group.py 2013-07-05 11:48:41 +0000
3+++ lava/dispatcher/group.py 2013-07-14 13:26:24 +0000
4@@ -20,7 +20,6 @@
5 import json
6 import time
7 import socket
8-import copy
9
10
11 class GroupDispatcher(object):
12@@ -88,7 +87,6 @@
13 logging.error("Missing client_name in request: %s" % json_data)
14 return None
15 if json_data['group_name'] not in self.all_groups:
16- print json.dumps(json_data)
17 if "group_size" not in json_data or json_data["group_size"] == 0:
18 logging.error('%s asked for a new group %s without specifying the size of the group'
19 % (client_name, json_data['group_name']))
20@@ -151,7 +149,8 @@
21 self._badRequest()
22 return
23 logging.info("Sending messageID '%s' to %s in group %s: %s" %
24- (messageID, client_name, self.group['group'], json.dumps(self.group['messages'][client_name][messageID])))
25+ (messageID, client_name, self.group['group'],
26+ json.dumps(self.group['messages'][client_name][messageID])))
27 msg = {"response": "ack", "message": self.group['messages'][client_name][messageID]}
28 logging.info("Sending response to %s in group %s: %s" %
29 (client_name, self.group['group'], json.dumps(msg)))
30@@ -192,13 +191,15 @@
31 Global synchronization primitive. Sends a message and waits for the same
32 message from all of the other devices.
33 """
34- logging.debug("GroupDispatcher:lavaSync %s from %s in group %s" %(json.dumps(json_data), client_name, self.group['group']))
35+ logging.debug("GroupDispatcher:lavaSync %s from %s in group %s" %
36+ (json.dumps(json_data), client_name, self.group['group']))
37 messageID = self._getMessageID(json_data)
38 message = self._getMessage(json_data)
39 # FIXME: in _sendMessage, be sure to send the messageID if message is empty
40 if not message:
41 message = messageID
42- logging.info("LavaSync request for '%s' at stage '%s' in group '%s'" % (client_name, messageID, self.group['group']))
43+ logging.info("LavaSync request for '%s' at stage '%s' in group '%s'" %
44+ (client_name, messageID, self.group['group']))
45 self.group['syncs'].setdefault(messageID, {})
46 self.group['messages'].setdefault(client_name, {}).setdefault(messageID, {})
47 if len(self.group['syncs'][messageID]) >= self.group['count']:
48@@ -230,21 +231,13 @@
49 """
50 messageID = self._getMessageID(json_data)
51 if 'role' in json_data:
52- role_msg = {}
53+ logging.debug("setting message: %s for %s" % (self.group['messages'][client_name][messageID], client_name))
54 for client in self.group['roles'][json_data['role']]:
55 if client not in self.group['messages'] or messageID not in self.group['messages'][client]:
56 self._waitResponse()
57 return
58- # combine all messages for this messageID into a single message for the entire role.
59- role_msg[client] = copy.deepcopy(self.group['messages'][client][messageID])
60- msg = {}
61- # build a single structure with all of the data for all clients in the role
62- for client in self.group['roles'][json_data['role']]:
63- msg[client] = role_msg[client]
64- del self.group['messages'][client_name][messageID]
65- # now put all of the data in msg into the messageID for all clients with this role
66- for client in self.group['roles'][json_data['role']]:
67- self.group['messages'][client][messageID] = copy.deepcopy(msg)
68+ logging.debug("broadcasting: %s for %s" % (self.group['messages'][client][messageID], client))
69+ logging.debug("lavaWaitAll message: %s" % json.dumps(self.group['messages'][client_name][messageID]))
70 else:
71 for client in self.group['clients']:
72 if client not in self.group['messages'] or messageID not in self.group['messages'][client]:
73@@ -266,7 +259,7 @@
74 return
75 self._sendMessage(client_name, messageID)
76
77- def lavaSend(self, json_data):
78+ def lavaSend(self, json_data, client_name):
79 """
80 A message list won't be seen by the destination until the destination
81 calls lava_wait or lava_wait_all with the messageID
82@@ -274,14 +267,26 @@
83 """
84 message = self._getMessage(json_data)
85 messageID = self._getMessageID(json_data)
86- logging.info("lavaSend handler in GroupDispatcher received a messageID '%s' for group '%s'"
87- % (messageID, self.group['group']))
88+ logging.info("lavaSend handler in GroupDispatcher received a messageID '%s' for group '%s' from %s"
89+ % (messageID, self.group['group'], client_name))
90+ if client_name not in self.group['messages']:
91+ self.group['messages'][client_name] = {}
92+ # construct the message hash which stores the data from each client separately
93+ # but which gets returned as a complete hash upon request
94+ msg_hash = {}
95+ msg_hash.update({client_name: message})
96+ # always set this client data if the call is made to update the broadcast
97+ if messageID not in self.group['messages'][client_name]:
98+ self.group['messages'][client_name][messageID] = {}
99+ self.group['messages'][client_name][messageID].update(msg_hash)
100+ logging.debug("message %s for %s" % (json.dumps(self.group['messages'][client_name][messageID]), client_name))
101 for client in self.group['clients']:
102 if client not in self.group['messages']:
103 self.group['messages'][client] = {}
104 if messageID not in self.group['messages'][client]:
105- self.group['messages'][client][messageID] = []
106- self.group['messages'][client][messageID].append(message)
107+ self.group['messages'][client][messageID] = {}
108+ self.group['messages'][client][messageID].update(msg_hash)
109+ logging.debug("broadcast %s for %s" % (json.dumps(self.group['messages'][client][messageID]), client))
110 self._ackResponse()
111
112 def dataReceived(self, json_data):
113@@ -315,7 +320,7 @@
114 self.lavaWait(json_data, client_name)
115 elif request == 'lava_send':
116 logging.info("lava_send: %s" % json_data)
117- self.lavaSend(json_data)
118+ self.lavaSend(json_data, client_name)
119 elif request == "complete":
120 logging.info("dispatcher communication for '%s' in group '%s' is complete, closing." %
121 (client_name, self.group['group']))

Subscribers

People subscribed via source and target branches

to status/vote changes: