Merge lp:~charlesk/keeper/service-dbusmock-should-notice-helper-failure into lp:keeper

Proposed by Charles Kerr
Status: Merged
Merge reported by: Charles Kerr
Merged at revision: 57
Proposed branch: lp:~charlesk/keeper/service-dbusmock-should-notice-helper-failure
Merge into: lp:keeper
Diff against target: 563 lines (+212/-146)
4 files modified
tests/com_canonical_keeper.py (+166/-121)
tests/dbusmock/keeper-template-test.cpp (+1/-1)
tests/unit/tar/keeper-tar-create-test.cpp (+1/-1)
tests/utils/keeper-dbusmock-fixture.h (+44/-23)
To merge this branch: bzr merge lp:~charlesk/keeper/service-dbusmock-should-notice-helper-failure
Reviewer Review Type Date Requested Status
Xavi Garcia (community) Approve
Review via email: mp+301711@code.launchpad.net

Commit message

The keeper dbusmock now sets a task's status to 'failure' if its helper fails.

Description of the change

The keeper dbusmock now sets a task's status to 'failure' if its helper fails.

This required some medium-sized changes in how the mock keeps state, but the idea is straightforward:

* Previously the work struct only held backup blob pieces and it was created when helper asked for a socket.

* The work struct's role is expanded to monitor the helper process for error. It now also holds the helper's pid, the action state, and so on (and is renamed TaskData to reflect this bigger role). It's now created for each task when StartBackup() or StartRestore() called, and is used to build state when com.canonical.keeper.User.State() is called.

In order for the client apps to query the tasks even after the tasks all finish, eg to show what succeeded and what failed, we keep TaskData round even after the tasks finish. They're cleared out only when the next call to StartBackup() / StartRestore() is made.

NB: Keeping task data around is something we'll need to do in production's BackupManager as well.

This patch also updates keeper-template-fixture's wait_for_backups_to_finish() to understand that TaskData sticks around after the backup is done.

To post a comment you must log in.
56. By Charles Kerr

sync with trunk

Revision history for this message
Xavi Garcia (xavi-garcia-mena) wrote :

Looks good to me, thanks!

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'tests/com_canonical_keeper.py'
2--- tests/com_canonical_keeper.py 2016-07-28 19:42:00 +0000
3+++ tests/com_canonical_keeper.py 2016-08-02 16:03:31 +0000
4@@ -25,32 +25,28 @@
5
6 SERVICE_PATH = '/com/canonical/keeper'
7 SERVICE_IFACE = 'com.canonical.keeper'
8-USER_PATH = '/com/canonical/keeper/user'
9-USER_IFACE = 'com.canonical.keeper.User'
10-HELPER_PATH = '/com/canonical/keeper/helper'
11-HELPER_IFACE = 'com.canonical.keeper.Helper'
12-MOCK_IFACE = 'com.canonical.keeper.Mock'
13-
14-# magic keys used by dbusmock
15-BUS_NAME = 'com.canonical.keeper'
16-MAIN_IFACE = SERVICE_IFACE
17-MAIN_OBJ = SERVICE_PATH
18-SYSTEM_BUS = False
19-
20-ACTION_QUEUED = 'queued'
21-ACTION_SAVING = 'saving'
22-ACTION_RESTORING = 'restoring'
23+USER_PATH = '/'.join([SERVICE_PATH, 'user'])
24+USER_IFACE = '.'.join([SERVICE_IFACE, 'User'])
25+HELPER_PATH = '/'.join([SERVICE_PATH, 'helper'])
26+HELPER_IFACE = '.'.join([SERVICE_IFACE, 'Helper'])
27+MOCK_IFACE = '.'.join([SERVICE_IFACE, 'Mock'])
28+
29 ACTION_CANCELLED = 'cancelled'
30+ACTION_COMPLETE = 'complete'
31 ACTION_FAILED = 'failed'
32-ACTION_COMPLETE = 'complete'
33+ACTION_RESTORING = 'restoring'
34+ACTION_SAVING = 'saving'
35+ACTION_QUEUED = 'queued'
36
37 KEY_ACTION = 'action'
38+KEY_BLOB = 'blob-data'
39 KEY_CTIME = 'ctime'
40-KEY_BLOB = 'blob-data'
41+KEY_ERROR = 'error'
42 KEY_HELPER = 'helper-exec'
43 KEY_NAME = 'display-name'
44+KEY_PERCENT_DONE = 'percent-done'
45+KEY_SIZE = 'size'
46 KEY_SPEED = 'speed'
47-KEY_SIZE = 'size'
48 KEY_SUBTYPE = 'subtype'
49 KEY_TYPE = 'type'
50 KEY_UUID = 'uuid'
51@@ -59,6 +55,31 @@
52 TYPE_FOLDER = 'folder'
53 TYPE_SYSTEM = 'system-data'
54
55+# magic keys used by dbusmock
56+BUS_NAME = 'com.canonical.keeper'
57+MAIN_IFACE = SERVICE_IFACE
58+MAIN_OBJ = SERVICE_PATH
59+SYSTEM_BUS = False
60+
61+
62+#
63+# classes
64+#
65+
66+
67+class TaskData:
68+ def __init__(self):
69+ self.action = ACTION_QUEUED
70+ self.blob = None
71+ self.n_bytes = 0
72+ self.n_left = 0
73+ self.error = ''
74+ self.chunks = []
75+ self.sock = None
76+ self.uuid = None
77+ self.bytes_per_second = {}
78+
79+
80 #
81 # Utils
82 #
83@@ -80,7 +101,7 @@
84
85 def fail_if_busy():
86 user = mockobject.objects[USER_PATH]
87- if user.all_tasks:
88+ if user.remaining_tasks:
89 fail("can't do that while service is busy")
90
91
92@@ -107,15 +128,20 @@
93
94 def user_start_next_task(user):
95 if not user.remaining_tasks:
96- user.log('last task finished')
97- user.all_tasks = []
98+ user.log('last task finished; setting user.current_task to None')
99 user.current_task = None
100- user.log('setting user.current_task to None')
101 user.update_state_property(user)
102
103 else:
104 uuid = user.remaining_tasks.pop(0)
105 user.current_task = uuid
106+
107+ # update the action state
108+ if user.backup_choices.get(uuid):
109+ action = ACTION_SAVING
110+ else:
111+ action = ACTION_RESTORING
112+ user.task_data[uuid].action = action
113 user.update_state_property(user)
114
115 # find the helper to run
116@@ -124,7 +150,7 @@
117 choice = user.restore_choices.get(uuid)
118 helper_exec = choice.get(KEY_HELPER)
119
120- # build the env that we'll pass to the helper
121+ # build the helper's environment variables
122 henv = {}
123 henv['QDBUS_DEBUG'] = '1'
124 henv['G_DBUS_DEBUG'] = 'call,message,signal,return'
125@@ -133,20 +159,93 @@
126 if val:
127 henv[key] = val
128
129- # set the working directory for folder backups
130- helper_cwd = os.getcwd()
131+ # set the helper's cwd
132 if choice.get(KEY_TYPE) == TYPE_FOLDER:
133 helper_cwd = choice.get(KEY_SUBTYPE)
134+ else:
135+ # FIXME: other types
136+ helper_cwd = os.getcwd()
137
138 # spawn the helper
139 user.log('starting %s for %s, env %s' % (helper_exec, uuid, henv))
140- subprocess.Popen(
141+ user.process = subprocess.Popen(
142 [helper_exec, HELPER_PATH],
143 env=henv, stdout=sys.stdout, stderr=sys.stderr,
144 cwd=helper_cwd,
145 shell=helper_exec.endswith('.sh')
146 )
147
148+ GLib.timeout_add(10, user.periodic_func, user)
149+
150+
151+def user_periodic_func(user):
152+
153+ done = False
154+ got_data_this_pass = False
155+
156+ if not user.process:
157+ fail("bug: user_process_check_func called w/o user.process")
158+
159+ uuid = user.current_task
160+ td = user.task_data[uuid]
161+
162+ # did the helper exit with an error code?
163+ returncode = user.process.poll()
164+ if returncode:
165+ error = 'helper exited with a returncode of %s' % (str(returncode))
166+ user.log(error)
167+ td.error = error
168+ td.action = ACTION_FAILED
169+ done = True
170+
171+ # try to read the socket
172+ if td.sock and not td.error:
173+ chunk = td.sock.recv(4096*2)
174+ chunk_len = len(chunk)
175+ if chunk_len:
176+ got_data_this_pass = True
177+ td.chunks.append(chunk)
178+ td.n_left -= chunk_len
179+ user.log('got %s more bytes; %s left' % (chunk_len, td.n_left))
180+ if td.n_left <= 0:
181+ done = True
182+
183+ # if done, clean up the socket
184+ if done and td.sock:
185+ user.log('cleaning up sock')
186+ td.sock.shutdown(socket.SHUT_RDWR)
187+ td.sock.close()
188+ td.sock = None
189+
190+ # if done successfully, save the blob
191+ if done and not td.error:
192+ user.log('setting blob')
193+ blob = b''.join(td.chunks)
194+ td.blob = blob
195+ user.log('backup %s done; %s bytes' % (uuid, len(blob)))
196+ td.action = ACTION_COMPLETE
197+
198+ # maybe update the task's state
199+ if done or got_data_this_pass:
200+ user.update_state_property(user)
201+
202+ if done:
203+ user.process = None
204+ user.start_next_task(user)
205+
206+ return not done
207+
208+
209+def user_init_tasks(user, uuids):
210+ user.all_tasks = uuids
211+ user.remaining_tasks = copy.copy(uuids)
212+ tds = {}
213+ for uuid in uuids:
214+ td = TaskData()
215+ td.uuid = uuid
216+ tds[uuid] = td
217+ user.task_data = tds
218+
219
220 def user_start_backup(user, uuids):
221
222@@ -156,8 +255,7 @@
223 if uuid not in user.backup_choices:
224 badarg('uuid %s is not a valid backup choice' % (uuid))
225
226- user.all_tasks = uuids
227- user.remaining_tasks = copy.copy(uuids)
228+ user.init_tasks(user, uuids)
229 user.start_next_task(user)
230
231
232@@ -167,11 +265,10 @@
233 fail_if_busy()
234 for uuid in uuids:
235 if uuid not in user.restore_choices:
236- badarg('uuid %s is not a valid backup choice' % (uuid))
237+ badarg('uuid %s is not a valid restore choice' % (uuid))
238
239- user.all_tasks = uuids
240- user.remaining_tasks = copy.copy(uuids)
241- user.start_next_task()
242+ user.init_tasks(user, uuids)
243+ user.start_next_task(user)
244
245
246 def user_cancel(user):
247@@ -191,19 +288,12 @@
248 for uuid in user.all_tasks:
249 task_state = {}
250
251- # get the task's action
252- if uuid == user.current_task:
253- if uuid in user.backup_choices:
254- action = ACTION_SAVING
255- else:
256- action = ACTION_RESTORING
257- elif uuid in user.remaining_tasks:
258- action = ACTION_QUEUED
259- else: # fixme: handle ACTION_CANCELLED, ACTION_FAILED
260- action = ACTION_COMPLETE
261+ # action
262+ td = user.task_data[uuid]
263+ action = td.action
264 task_state[KEY_ACTION] = dbus.String(action)
265
266- # get the task's display-name
267+ # display-name
268 choice = user.backup_choices.get(uuid, None)
269 if not choice:
270 choice = user.restore_choices.get(uuid, None)
271@@ -212,30 +302,34 @@
272 display_name = choice.get(KEY_NAME, None)
273 task_state[KEY_NAME] = dbus.String(display_name)
274
275+ # error
276+ if action == ACTION_FAILED:
277+ task_state[KEY_ERROR] = dbus.String(td.error)
278+
279+ # percent-done
280+ if td.n_bytes:
281+ p = dbus.Double((td.n_bytes - td.n_left) / td.n_bytes)
282+ else:
283+ p = dbus.Double(0.0)
284+ task_state[KEY_PERCENT_DONE] = p
285+
286 # speed
287 helper = mockobject.objects[HELPER_PATH]
288- if (uuid == user.current_task) and helper.work:
289+ if uuid == user.current_task:
290 n_secs = 2
291 n_bytes = 0
292 too_old = time.time() - n_secs
293- for key in helper.work.bytes_per_second:
294+ for key in td.bytes_per_second:
295+ helper.log('key is %s' % (str(key)))
296 if key > too_old:
297- n_bytes += helper.work.bytes_per_second[key]
298+ n_bytes += td.bytes_per_second[key]
299+ helper.log('n_bytes is %s' % (str(n_bytes)))
300 bytes_per_second = n_bytes / n_secs
301 else:
302 bytes_per_second = 0
303 task_state[KEY_SPEED] = dbus.Int32(bytes_per_second)
304
305- # FIXME: use a real percentage here
306- # FIXME: handle ACTION_CANCELLED, ACTION_FAILED
307- if action == ACTION_COMPLETE:
308- percent_done = dbus.Double(1.0)
309- elif action == ACTION_SAVING or action == ACTION_RESTORING:
310- percent_done = dbus.Double(0.5)
311- else:
312- percent_done = dbus.Double(0.0)
313- task_state['percent-done'] = percent_done
314-
315+ # uuid
316 tasks_states[uuid] = dbus.Dictionary(task_state)
317
318 return dbus.Dictionary(
319@@ -256,73 +350,21 @@
320 # Helper Obj
321 #
322
323-class HelperWork:
324- chunks = None
325- n_bytes = None
326- n_left = None
327- sock = None
328- uuid = None
329- bytes_per_second = None
330-
331-
332-def helper_periodic_func(helper):
333-
334- if not helper.work:
335- fail("bug: helper_periodic_func called w/o helper.work")
336-
337- # try to read a bit
338- chunk = helper.work.sock.recv(4096*2)
339- chunk_len = len(chunk)
340- if chunk_len:
341- helper.work.chunks.append(chunk)
342- helper.work.n_left -= chunk_len
343- key = int(time.time())
344- old_n_bytes = helper.work.bytes_per_second.get(key, 0)
345- new_n_bytes = old_n_bytes + chunk_len
346- helper.work.bytes_per_second[key] = new_n_bytes
347- helper.log('got %s bytes; %s left' % (chunk_len, helper.work.n_left))
348-
349- # cleanup if done
350- done = helper.work.n_left <= 0
351- if done:
352- helper.work.sock.shutdown(socket.SHUT_RDWR)
353- helper.work.sock.close()
354- user = mockobject.objects[USER_PATH]
355- user.backup_data[helper.work.uuid] = b''.join(helper.work.chunks)
356- user.log(
357- 'backup %s done; %s bytes' %
358- (helper.work.uuid, len(user.backup_data[helper.work.uuid]))
359- )
360- user.start_next_task(user)
361- helper.work = None
362-
363- if chunk_len or done:
364- user = mockobject.objects[USER_PATH]
365- user.update_state_property(user)
366-
367- return not done
368-
369
370 def helper_start_backup(helper, n_bytes):
371
372 helper.log("got start_backup request for %s bytes" % (n_bytes))
373- if helper.work:
374- fail("can't start a new backup while one's already active")
375
376 parent, child = socket.socketpair()
377
378- # set up helper's workarea
379- work = HelperWork()
380- work.chunks = []
381- work.n_bytes = n_bytes
382- work.n_left = n_bytes
383- work.sock = parent
384- work.uuid = mockobject.objects[USER_PATH].current_task
385- work.bytes_per_second = {}
386- helper.work = work
387-
388- # start checking periodically
389- GLib.timeout_add(10, helper.periodic_func, helper)
390+ user = mockobject.objects[USER_PATH]
391+ uuid = user.current_task
392+
393+ td = user.task_data[uuid]
394+ td.n_bytes = n_bytes
395+ td.n_left = n_bytes
396+ td.sock = parent
397+
398 return dbus.types.UnixFd(child.fileno())
399
400
401@@ -366,9 +408,10 @@
402
403
404 def mock_get_backup_data(mock, uuid):
405- blob = mockobject.objects[USER_PATH].backup_data[uuid]
406- mock.log('returning %s byte blob for uuid %s' % (len(blob), uuid))
407- return blob
408+ user = mockobject.objects[USER_PATH]
409+ td = user.task_data[uuid]
410+ user.log('returning %s byte blob for uuid %s' % (len(td.blob), uuid))
411+ return td.blob
412
413
414 #
415@@ -384,6 +427,7 @@
416 main.AddObject(path, USER_IFACE, {}, [])
417 o = mockobject.objects[path]
418 o.get_backup_choices = user_get_backup_choices
419+ o.init_tasks = user_init_tasks
420 o.start_backup = user_start_backup
421 o.get_restore_choices = user_get_restore_choices
422 o.start_restore = user_start_restore
423@@ -393,10 +437,13 @@
424 o.start_next_task = user_start_next_task
425 o.all_tasks = []
426 o.remaining_tasks = []
427+ o.task_data = {}
428 o.backup_data = {}
429 o.backup_choices = parameters.get('backup-choices', {})
430 o.restore_choices = parameters.get('restore-choices', {})
431 o.current_task = None
432+ o.process = None
433+ o.periodic_func = user_periodic_func
434 o.defined_types = [TYPE_APP, TYPE_SYSTEM, TYPE_FOLDER]
435 o.AddMethods(USER_IFACE, [
436 ('GetBackupChoices', '', 'a{sa{sv}}',
437@@ -418,8 +465,6 @@
438 o = mockobject.objects[path]
439 o.start_backup = helper_start_backup
440 o.start_restore = helper_start_restore
441- o.periodic_func = helper_periodic_func
442- o.work = None
443 o.AddMethods(HELPER_IFACE, [
444 ('StartBackup', 't', 'h',
445 'ret = self.start_backup(self, args[0])'),
446
447=== modified file 'tests/dbusmock/keeper-template-test.cpp'
448--- tests/dbusmock/keeper-template-test.cpp 2016-07-28 19:26:33 +0000
449+++ tests/dbusmock/keeper-template-test.cpp 2016-08-02 16:03:31 +0000
450@@ -151,7 +151,7 @@
451 // start the backup
452 QDBusReply<void> reply = user_iface_->call("StartBackup", QStringList{uuid});
453 EXPECT_TRUE(reply.isValid()) << qPrintable(reply.error().message());
454- wait_for_backup_to_finish();
455+ ASSERT_TRUE(wait_for_tasks_to_finish());
456
457 // ask keeper for the blob
458 QDBusReply<QByteArray> blob = mock_iface_->call(QStringLiteral("GetBackupData"), uuid);
459
460=== modified file 'tests/unit/tar/keeper-tar-create-test.cpp'
461--- tests/unit/tar/keeper-tar-create-test.cpp 2016-07-28 19:26:33 +0000
462+++ tests/unit/tar/keeper-tar-create-test.cpp 2016-08-02 16:03:31 +0000
463@@ -51,7 +51,7 @@
464 // start the backup
465 QDBusReply<void> reply = user_iface_->call("StartBackup", QStringList{uuid});
466 ASSERT_TRUE(reply.isValid()) << qPrintable(reply.error().message());
467- wait_for_backup_to_finish();
468+ wait_for_tasks_to_finish();
469
470 // ask keeper for the blob
471 QDBusReply<QByteArray> blob = mock_iface_->call(QStringLiteral("GetBackupData"), uuid);
472
473=== modified file 'tests/utils/keeper-dbusmock-fixture.h'
474--- tests/utils/keeper-dbusmock-fixture.h 2016-07-28 19:42:00 +0000
475+++ tests/utils/keeper-dbusmock-fixture.h 2016-08-02 16:03:31 +0000
476@@ -33,19 +33,21 @@
477
478 #include <memory>
479
480-#define KEY_ACTION "action"
481-#define KEY_CTIME "ctime"
482-#define KEY_BLOB "blob-data"
483-#define KEY_HELPER "helper-exec"
484-#define KEY_NAME "display-name"
485-#define KEY_PERCENT "percent-done"
486-#define KEY_SIZE "size"
487-#define KEY_SPEED "speed"
488-#define KEY_SUBTYPE "subtype"
489-#define KEY_TYPE "type"
490-#define KEY_UUID "uuid"
491-
492-// FIXME: this should go in a shared header in include/
493+// FIXME: these should go in a shared header in include/
494+
495+static constexpr char const * KEY_ACTION = {"action"};
496+static constexpr char const * KEY_CTIME = {"ctime"};
497+static constexpr char const * KEY_BLOB = {"blob-data"};
498+static constexpr char const * KEY_ERROR = {"error"};
499+static constexpr char const * KEY_HELPER = {"helper-exec"};
500+static constexpr char const * KEY_NAME = {"display-name"};
501+static constexpr char const * KEY_PERCENT = {"percent-done"};
502+static constexpr char const * KEY_SIZE = {"size"};
503+static constexpr char const * KEY_SPEED = {"speed"};
504+static constexpr char const * KEY_SUBTYPE = {"subtype"};
505+static constexpr char const * KEY_TYPE = {"type"};
506+static constexpr char const * KEY_UUID = {"uuid"};
507+
508 static constexpr char const * ACTION_QUEUED = {"queued"};
509 static constexpr char const * ACTION_SAVING = {"saving"};
510 static constexpr char const * ACTION_RESTORING = {"restoring"};
511@@ -113,23 +115,42 @@
512 std::unique_ptr<DBusInterfaceKeeperUser> user_iface_;
513 std::unique_ptr<QDBusInterface> mock_iface_;
514
515- void EXPECT_EVENTUALLY(std::function<bool()>&& test, qint64 timeout_msec=5000)
516+ bool wait_for(
517+ std::function<bool()>&& test_function,
518+ qint64 timeout_msec=1000,
519+ qint64 test_interval=100)
520 {
521 QElapsedTimer timer;
522 timer.start();
523 bool passed;
524- do {
525- passed = test();
526- if (!passed)
527- QThread::msleep(100);
528- } while (!passed && !timer.hasExpired(timeout_msec));
529- EXPECT_TRUE(passed);
530+ for(;;) {
531+ if (test_function())
532+ return true;
533+ if (timer.hasExpired(timeout_msec))
534+ return false;
535+ QThread::msleep(test_interval);
536+ }
537 }
538
539- void wait_for_backup_to_finish()
540+ bool wait_for_tasks_to_finish()
541 {
542- EXPECT_EVENTUALLY([this]{return !user_iface_->state().isEmpty();}); // backup running
543- EXPECT_EVENTUALLY([this]{return user_iface_->state().isEmpty();}); // backup finished
544+ auto tasks_exist = [this]{
545+ return !user_iface_->state().isEmpty();
546+ };
547+
548+ auto all_tasks_finished = [this]{
549+ const auto state = user_iface_->state();
550+ bool all_done = true;
551+ for(const auto& properties : state) {
552+ const auto action = properties.value(KEY_ACTION);
553+ bool task_done = (action == ACTION_CANCELLED) || (action == ACTION_FAILED) || (action == ACTION_COMPLETE);
554+ if (!task_done)
555+ all_done = false;
556+ }
557+ return all_done;
558+ };
559+
560+ return wait_for(tasks_exist) && wait_for(all_tasks_finished,5000);
561 }
562
563 QString add_backup_choice(const QMap<QString,QVariant>& properties)

Subscribers

People subscribed via source and target branches

to all changes: