Merge ~sylvain-pineau/checkbox-ng:mini-me into checkbox-ng:remote-api-bump

Proposed by Sylvain Pineau
Status: Work in progress
Proposed branch: ~sylvain-pineau/checkbox-ng:mini-me
Merge into: checkbox-ng:remote-api-bump
Diff against target: 481 lines (+366/-14)
6 files modified
checkbox_ng/launcher/checkbox_cli.py (+2/-0)
checkbox_ng/launcher/mini_me.py (+327/-0)
plainbox/impl/execution.py (+7/-0)
plainbox/impl/session/assistant.py (+6/-0)
plainbox/impl/session/remote_assistant.py (+23/-13)
plainbox/impl/session/restart.py (+1/-1)
Reviewer Review Type Date Requested Status
Maciej Kisielewski (community) Needs Fixing
Taihsiang Ho Pending
Review via email: mp+415611@code.launchpad.net

Description of the change

the checkbox-cli mini-me command (see the commits for details)

How to test?

The standalone command (quick an easy):

$ checkbox-cli mini-me --host 192.168.1.33 com.canonical.certification::usb/detect

Nested inside an existing checkbox remote/service session:

1. Prepare the following launcher (adjust the target host ip of course):

    [launcher]
    app_id = com.canonical.certification:checkbox-test
    launcher_version = 1
    stock_reports = text, submission_files

    [test plan]
    unit = com.canonical.certification::demo
    forced = yes

    [test selection]
    forced = yes

    [ui]
    output = hide-resource-and-attachment

    [environment]
    STRESS_BOOT_ITERATIONS=2
    STRESS_BOOT_WAIT_DELAY=10
    STRESS_BOOT_WAKEUP_DELAY=15
    TARGET_HOST=192.168.1.33

2. A small test plan like this:

    id: demo
    unit: test plan
    _name: demo tests
    _description:
     demo tests
    include:
     demo/usb/detect

3. A demo job calling the new command:

    plugin: shell
    category_id: com.canonical.plainbox::usb
    id: demo/usb/detect
    command:
     checkbox-cli mini-me com.canonical.certification::usb/detect
    _summary: Display USB devices attached to SUT
    _description: Detects and shows USB devices attached to this system.

4. Install checkbox-ng and the checkbox provider from the dev ppa on a spare system

5. From your laptop, start a checkbox service (after adding the test plan and the new job to one of your local providers, side-loading one is a good idea):

    sudo checkbox-cli service

6. Finally execute your launcher file with:

    checkbox-cli master 127.0.0.1 ./mylauncher

If everything goes well, the usb/detect output should list devices from your spare system

To post a comment you must log in.
Revision history for this message
Maciej Kisielewski (kissiel) wrote :

Awesome stuff.

Is this based on remote-api-bump branch? There are some changes that landed there, and may conflict with this.

I think we need a way to mark session as a "mini-me" session. Reason is, if someone started a session with normal remote, connecting to it will result on MiniMe controller picking up the execution with no way of disconnecting (interrupt is bruteforce) and with different handling of things. MiniMe should be able to connect only to Idle services. And also it should have a global lock on the service (with a timeout) - those session should be short lived as I understand it. Also disconnecting controller should terminate this ephemeral session (Unless we want to support reboots as part of mini-me, but I doubt it, as we want this to be ephemeral, stateless thing.

There are a few bits below that need fixing, mostly because the new code is heavily based on existing controller.

Taking into consideration some of the things I noted here I think the whole thing can be greatly simplified.

review: Needs Fixing
Revision history for this message
Maciej Kisielewski (kissiel) wrote :

I also think we should s/slave/service and s/master/controller wherever possible, so it's less work in the future, and also to enforce good habits.

Revision history for this message
Maciej Kisielewski (kissiel) wrote (last edit ):

I also don't think that stuff like:
 [test selection]
    forced = yes

should be required. There's no other option to run it, so it should be implied.

Unmerged commits

e1314da... by Sylvain Pineau

mini-me: new checkbox-cli command

The "remote" version of the "local" `checkbox-cli run` command.

Supported pattern:
- Only a single job id is supported
- Only automated tests, no manual or semi-auto plugins
- Test plan ids are rejected

Selection of the target host:
- Target machine IP can be provided with either --host or the env var TARGET_HOST

Standard streams
- Remote stdout/stderr are printed locally on the the same local streams
- No colorizer

Session management:
- Ephemeral sessions on service side, removed after job execution
- launcher settings are transferred to the TARGET_HOST

617c9ac... by Sylvain Pineau

session:remote_assistant: Expose the hand_pick_jobs() method

71bfd7b... by Sylvain Pineau

session:assistant: add a method to save the launcher file in the session dir

save_launcher_file is also exposed over rpyc

4010421... by Sylvain Pineau

execution.py: Set a new env var PLAINBOX_SESSION_DIR

Full path to the current session directory

3ef87c1... by Sylvain Pineau

execution.py: Set PYTHONUNBUFFERED for all child processes

c865d8f... by Sylvain Pineau

fix: restart - Disable the checkbox-ng.service if __respawn_checkbox is used

Disable the service provided by checkbox-ng if another mechanism handles
the session restart.
e.g pm_test currently manages several reboots in a single job.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/checkbox_ng/launcher/checkbox_cli.py b/checkbox_ng/launcher/checkbox_cli.py
2index 9740eb8..0ae7c14 100644
3--- a/checkbox_ng/launcher/checkbox_cli.py
4+++ b/checkbox_ng/launcher/checkbox_cli.py
5@@ -38,6 +38,7 @@ from checkbox_ng.launcher.check_config import CheckConfig
6 from checkbox_ng.launcher.merge_reports import MergeReports
7 from checkbox_ng.launcher.merge_submissions import MergeSubmissions
8 from checkbox_ng.launcher.master import RemoteMaster
9+from checkbox_ng.launcher.mini_me import RemoteMe
10 from checkbox_ng.launcher.slave import RemoteSlave
11
12
13@@ -67,6 +68,7 @@ def main():
14 'tp-export': TestPlanExport,
15 'service': RemoteSlave,
16 'remote': RemoteMaster,
17+ 'mini-me': RemoteMe,
18 }
19 deprecated_commands = {
20 'slave': 'service',
21diff --git a/checkbox_ng/launcher/mini_me.py b/checkbox_ng/launcher/mini_me.py
22new file mode 100644
23index 0000000..266ed80
24--- /dev/null
25+++ b/checkbox_ng/launcher/mini_me.py
26@@ -0,0 +1,327 @@
27+# This file is part of Checkbox.
28+#
29+# Copyright 2022 Canonical Ltd.
30+# Written by:
31+# Sylvain Pineau <sylvain.pineau@canonical.com>
32+#
33+# Checkbox is free software: you can redistribute it and/or modify
34+# it under the terms of the GNU General Public License version 3,
35+# as published by the Free Software Foundation.
36+#
37+# Checkbox is distributed in the hope that it will be useful,
38+# but WITHOUT ANY WARRANTY; without even the implied warranty of
39+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
40+# GNU General Public License for more details.
41+#
42+# You should have received a copy of the GNU General Public License
43+# along with Checkbox. If not, see <http://www.gnu.org/licenses/>.
44+"""
45+This module contains implementation of the mini me end of the remote execution
46+functionality.
47+"""
48+import contextlib
49+import ipaddress
50+import json
51+import logging
52+import os
53+import select
54+import socket
55+import time
56+import sys
57+
58+from functools import partial
59+
60+from plainbox.impl.session.remote_assistant import RemoteSessionAssistant
61+from plainbox.vendor import rpyc
62+from checkbox_ng.launcher.stages import MainLoopStage
63+from checkbox_ng.launcher.stages import ReportsStage
64+from tqdm import tqdm
65+_logger = logging.getLogger("mini-me")
66+
67+
68+def environ_or_required(key):
69+ """Mapping for argparse to supply required or default from $ENV."""
70+ if os.environ.get(key):
71+ return {'default': os.environ.get(key)}
72+ else:
73+ return {'required': True}
74+
75+
76+class RemoteMe(ReportsStage, MainLoopStage):
77+ """
78+ Control remote slave instance
79+
80+ This class implements the part that presents UI to the operator and
81+ steers the session.
82+ """
83+
84+ name = 'remote-me'
85+
86+ @property
87+ def is_interactive(self):
88+ return False
89+
90+ @property
91+ def C(self):
92+ return None
93+
94+ @property
95+ def sa(self):
96+ return self._sa
97+
98+ def invoked(self, ctx):
99+ self._launcher_text = ''
100+ self._is_bootstrapping = False
101+ self._target_host = ctx.args.host
102+ self._normal_user = ''
103+ timeout = 600
104+ deadline = time.time() + timeout
105+ port = ctx.args.port
106+ self.selection = [ctx.args.PATTERN]
107+ if not ipaddress.ip_address(ctx.args.host).is_loopback:
108+ _logger.info("Connecting to {}:{}. Timeout: {}s".format(
109+ ctx.args.host, port, timeout))
110+ while time.time() < deadline:
111+ try:
112+ self.connect_and_run(ctx.args.host, port)
113+ break
114+ except (ConnectionRefusedError, socket.timeout, OSError):
115+ print('.', end='', flush=True)
116+ time.sleep(1)
117+ else:
118+ print("\nConnection timed out.")
119+
120+ def connect_and_run(self, host, port=18871):
121+ config = rpyc.core.protocol.DEFAULT_CONFIG.copy()
122+ config['allow_all_attrs'] = True
123+ config['sync_request_timeout'] = 120
124+ keep_running = False
125+ server_msg = None
126+ self._prepare_transports()
127+ interrupted = False
128+ while True:
129+ try:
130+ if interrupted:
131+ _logger.info("remote: Session interrupted")
132+ interrupted = False # we are handling the interruption ATM
133+ # next line can raise exception due to connection being
134+ # lost so let's set the default behavior to quitting
135+ keep_running = False
136+ keep_running = self._handle_interrupt()
137+ if not keep_running:
138+ break
139+ conn = rpyc.connect(host, port, config=config)
140+ keep_running = True
141+
142+ def quitter(msg):
143+ # this will be called when the slave decides to disconnect
144+ # this master
145+ nonlocal server_msg
146+ nonlocal keep_running
147+ keep_running = False
148+ server_msg = msg
149+ with contextlib.suppress(AttributeError):
150+ # TODO: REMOTE_API
151+ # when bumping the remote api make this bit obligatory
152+ # i.e. remove the suppressing
153+ conn.root.register_master_blaster(quitter)
154+ self._sa = conn.root.get_sa()
155+ self.sa.conn = conn
156+ # TODO: REMOTE API RAPI: Remove this API on the next RAPI bump
157+ # the check and bailout is not needed if the slave as up to
158+ # date as this master, so after bumping RAPI we can assume
159+ # that slave is always passwordless
160+ if not self.sa.passwordless_sudo:
161+ raise SystemExit(
162+ "This version of Checkbox requires the service"
163+ " to be run as root")
164+ try:
165+ slave_api_version = self.sa.get_remote_api_version()
166+ except AttributeError:
167+ raise SystemExit("Service doesn't declare Remote API"
168+ " version. Update Checkbox on the"
169+ " SUT!")
170+ master_api_version = RemoteSessionAssistant.REMOTE_API_VERSION
171+ if slave_api_version != master_api_version:
172+ raise SystemExit(
173+ "Remote API version mismatch. Service "
174+ "uses: {}. Remote uses: {}".format(
175+ slave_api_version, master_api_version))
176+ state, payload = self.sa.whats_up()
177+ _logger.info("remote: Main dispatch with state: %s", state)
178+ keep_running = {
179+ 'idle': self.new_session,
180+ 'running': self.wait_and_continue,
181+ 'finalizing': self.finish_session,
182+ 'testsselected': partial(
183+ self.run_jobs, resumed_session_info=payload),
184+ 'bootstrapping': self.restart,
185+ 'started': self.restart,
186+ 'interacting': partial(
187+ self.resume_interacting, interaction=payload),
188+ }[state]()
189+ except EOFError as exc:
190+ if keep_running:
191+ print("Connection lost!")
192+ # this is yucky but it works, in case of explicit
193+ # connection closing by the slave we get this msg
194+ _logger.info("remote: Connection lost due to: %s", exc)
195+ if str(exc) == 'stream has been closed':
196+ print('Service explicitly disconnected you. Possible '
197+ 'reason: new remote connected to the service')
198+ break
199+ print(exc)
200+ time.sleep(1)
201+ else:
202+ # if keep_running got set to False it means that the
203+ # network interruption was planned, AKA slave disconnected
204+ # this master
205+ print(server_msg)
206+ break
207+ except (ConnectionRefusedError, socket.timeout, OSError) as exc:
208+ _logger.info("remote: Connection lost due to: %s", exc)
209+ if not keep_running:
210+ raise
211+ # it's reconnecting, so we can ignore refuses
212+ print('.', flush=True)
213+ time.sleep(0.5)
214+ except KeyboardInterrupt:
215+ interrupted = True
216+
217+ if not keep_running:
218+ break
219+
220+ def new_session(self):
221+ _logger.info("remote: Starting new session.")
222+ configuration = dict()
223+ session_dir = os.environ.get('PLAINBOX_SESSION_DIR', '/tmp')
224+ launcher_file = os.path.join(session_dir, 'launcher')
225+ if os.path.isfile(launcher_file):
226+ with open(launcher_file, 'rt', encoding='UTF-8') as f:
227+ self._launcher_text = f.read()
228+ configuration['launcher'] = self._launcher_text
229+ configuration['normal_user'] = self._normal_user
230+ self.sa.start_session(configuration)
231+ self.sa.hand_pick_jobs(self.selection)
232+ self.run_jobs()
233+
234+ def register_arguments(self, parser):
235+ parser.add_argument('--host', help="target host",
236+ **environ_or_required('TARGET_HOST'))
237+ parser.add_argument('--port', type=int, default=18871, help=(
238+ "port to connect to"))
239+ parser.add_argument(
240+ 'PATTERN',
241+ help="run a job matching the given regular expression")
242+
243+ def _handle_interrupt(self):
244+ """
245+ Returns True if the remote should keep running.
246+ And False if it should quit.
247+ """
248+ self._sa.terminate()
249+ return False
250+
251+ def cleanup_ephemeral_session(self):
252+ storage = self.sa.manager.storage
253+ self.sa.finalize_session()
254+ storage.remove()
255+
256+ def finish_session(self):
257+ self.cleanup_ephemeral_session()
258+ return_code = self.job_result.return_code
259+ if return_code:
260+ raise SystemExit(return_code)
261+
262+ def wait_and_continue(self):
263+ progress = self.sa.whats_up()[1]
264+ print("Rejoined session.")
265+ print("In progress: {} ({}/{})".format(
266+ progress[2], progress[0], progress[1]))
267+ self.wait_for_job()
268+ self.run_jobs()
269+
270+ def _handle_last_job_after_resume(self, resumed_session_info):
271+ jobs_repr = json.loads(
272+ self.sa.get_jobs_repr([resumed_session_info['last_job']]))
273+ job = jobs_repr[-1]
274+ self.job_result = self.sa.get_job_result(job['id'])
275+
276+ def run_jobs(self, resumed_session_info=None):
277+ if resumed_session_info and resumed_session_info['last_job']:
278+ self._handle_last_job_after_resume(resumed_session_info)
279+ _logger.info("remote: Running jobs.")
280+ jobs = self.sa.get_session_progress()
281+ _logger.debug("remote: Jobs to be run:\n%s",
282+ '\n'.join([' ' + job for job in jobs]))
283+ total_num = len(jobs['done']) + len(jobs['todo'])
284+ if total_num > 1:
285+ self.cleanup_ephemeral_session()
286+ raise SystemExit("More than one job to run!")
287+ if total_num == 0:
288+ self.cleanup_ephemeral_session()
289+ raise SystemExit(
290+ "No job found with this id: {}".format(self.selection[0]))
291+ jobs_repr = json.loads(
292+ self.sa.get_jobs_repr(jobs['todo'], len(jobs['done'])))
293+
294+ self._run_jobs(jobs_repr, total_num)
295+ self.finish_session()
296+
297+ def resume_interacting(self, interaction):
298+ self.sa.remember_users_response('rollback')
299+ self.run_jobs()
300+
301+ def wait_for_job(self, dont_finish=False):
302+ _logger.info("remote: Waiting for job to finish.")
303+ while True:
304+ state, payload = self.sa.monitor_job()
305+ if payload and not self._is_bootstrapping:
306+ for line in payload.splitlines():
307+ if line.startswith('stderr'):
308+ print(line[6:], file=sys.stderr)
309+ else:
310+ print(line[6:])
311+ if state == 'running':
312+ time.sleep(0.5)
313+ while True:
314+ res = select.select([sys.stdin], [], [], 0)
315+ if not res[0]:
316+ break
317+ # XXX: this assumes that sys.stdin is chunked in lines
318+ buff = res[0][0].readline()
319+ self.sa.transmit_input(buff)
320+ if not buff:
321+ break
322+ else:
323+ if dont_finish:
324+ return
325+ self.finish_job()
326+ break
327+
328+ def finish_job(self, result=None):
329+ _logger.info("remote: Finishing job with a result: %s", result)
330+ self.job_result = self.sa.finish_job(result)
331+
332+ def abandon(self):
333+ _logger.info("remote: Abandoning session.")
334+ self.sa.finalize_session()
335+
336+ def restart(self):
337+ _logger.info("remote: Restarting session.")
338+ self.abandon()
339+ self.new_session()
340+
341+ def _run_jobs(self, jobs_repr, total_num=0):
342+ for job in jobs_repr:
343+ # print("ID: {0}".format(job['id']))
344+ if 'user' in job['plugin'] or 'manual' in job['plugin']:
345+ self.cleanup_ephemeral_session()
346+ raise SystemExit("Not an automated job: {0}".format(job['id']))
347+ next_job = False
348+ while next_job is False:
349+ list(self.sa.run_job(job['id']))
350+ self.wait_for_job()
351+ break
352+ if next_job:
353+ continue
354diff --git a/plainbox/impl/execution.py b/plainbox/impl/execution.py
355index e1aa65b..06c19a1 100644
356--- a/plainbox/impl/execution.py
357+++ b/plainbox/impl/execution.py
358@@ -468,6 +468,11 @@ def get_execution_environment(job, environ, session_id, nest_dir):
359 # Add per-session shared state directory
360 env['PLAINBOX_SESSION_SHARE'] = WellKnownDirsHelper.session_share(
361 session_id)
362+ # Add the session directory
363+ env['PLAINBOX_SESSION_DIR'] = WellKnownDirsHelper.session_dir(
364+ session_id)
365+ # Force the stdout and stderr streams to be unbuffered
366+ env['PYTHONUNBUFFERED'] = "1"
367
368 def set_if_not_none(envvar, source):
369 """Update env if the source variable is not None"""
370@@ -542,6 +547,8 @@ def get_differential_execution_environment(job, environ, session_id, nest_dir,
371 for key, value in base_env.items():
372 if key in copy_vars or key.startswith('SNAP'):
373 delta_env[key] = value
374+ # Force the stdout and stderr streams to be unbuffered
375+ delta_env['PYTHONUNBUFFERED'] = "1"
376 return delta_env
377
378
379diff --git a/plainbox/impl/session/assistant.py b/plainbox/impl/session/assistant.py
380index c560828..253148e 100644
381--- a/plainbox/impl/session/assistant.py
382+++ b/plainbox/impl/session/assistant.py
383@@ -613,6 +613,12 @@ class SessionAssistant:
384 self._context.state.metadata.app_blob = updated_blob
385 self._manager.checkpoint()
386
387+ def save_launcher_file(self, launcher_text):
388+ session_dir = WellKnownDirsHelper.session_dir(self.get_session_id())
389+ launcher_file = os.path.join(session_dir, 'launcher')
390+ with open(launcher_file, 'wt', encoding='UTF-8') as f:
391+ f.write(launcher_text)
392+
393 @morris.signal
394 def session_available(self, session_id):
395 """
396diff --git a/plainbox/impl/session/remote_assistant.py b/plainbox/impl/session/remote_assistant.py
397index 96f5de4..b767722 100644
398--- a/plainbox/impl/session/remote_assistant.py
399+++ b/plainbox/impl/session/remote_assistant.py
400@@ -31,6 +31,7 @@ from subprocess import CalledProcessError, check_output
401
402 from plainbox.impl.config import Configuration
403 from plainbox.impl.execution import UnifiedRunner
404+from plainbox.impl.session import SessionMetaData
405 from plainbox.impl.session.assistant import SessionAssistant
406 from plainbox.impl.session.assistant import SA_RESTARTABLE
407 from plainbox.impl.session.jobs import InhibitionCause
408@@ -185,6 +186,9 @@ class RemoteSessionAssistant():
409 def update_app_blob(self, app_blob):
410 self._sa.update_app_blob(app_blob)
411
412+ def save_launcher_file(self, launcher_text):
413+ self._sa.save_launcher_file(launcher_text)
414+
415 def allowed_when(*states):
416 def wrap(f):
417 def fun(self, *args):
418@@ -301,6 +305,7 @@ class RemoteSessionAssistant():
419 'effective_normal_user': self._normal_user,
420 }).encode("UTF-8")
421 self._sa.update_app_blob(new_blob)
422+ self._sa.save_launcher_file(configuration['launcher'])
423 self._sa.configure_application_restart(self._cmd_callback)
424
425 self._session_id = self._sa.get_session_id()
426@@ -333,6 +338,11 @@ class RemoteSessionAssistant():
427 def get_bootstrapping_todo_list(self):
428 return self._sa.get_bootstrap_todo_list()
429
430+ @allowed_when(Started)
431+ def hand_pick_jobs(self, id_patterns):
432+ self._sa.hand_pick_jobs(id_patterns)
433+ self._state = TestsSelected
434+
435 def finish_bootstrap(self):
436 self._sa.finish_bootstrap()
437 self._state = Bootstrapped
438@@ -646,19 +656,19 @@ class RemoteSessionAssistant():
439 'extra_env': self.prepare_extra_env,
440 }
441 meta = self._sa.resume_session(session_id, runner_kwargs=runner_kwargs)
442- app_blob = json.loads(meta.app_blob.decode("UTF-8"))
443- launcher = app_blob['launcher']
444- self._launcher = Configuration.from_text(launcher, 'Remote launcher')
445- self._sa.use_alternate_configuration(self._launcher)
446-
447- self._normal_user = app_blob.get(
448- 'effective_normal_user', self._launcher.get_value(
449- 'daemon', 'normal_user'))
450- _logger.info(
451- "normal_user after loading metadata: %r", self._normal_user)
452- test_plan_id = app_blob['testplan_id']
453- self._sa.select_test_plan(test_plan_id)
454- self._sa.bootstrap()
455+ if SessionMetaData.FLAG_TESTPLANLESS not in meta.flags:
456+ app_blob = json.loads(meta.app_blob.decode("UTF-8"))
457+ launcher = app_blob['launcher']
458+ self._launcher = Configuration.from_text(launcher, 'Remote launcher')
459+ self._sa.use_alternate_configuration(self._launcher)
460+ self._normal_user = app_blob.get(
461+ 'effective_normal_user', self._launcher.get_value(
462+ 'daemon', 'normal_user'))
463+ _logger.info(
464+ "normal_user after loading metadata: %r", self._normal_user)
465+ test_plan_id = app_blob['testplan_id']
466+ self._sa.select_test_plan(test_plan_id)
467+ self._sa.bootstrap()
468 self._last_job = meta.running_job_name
469
470 result_dict = {
471diff --git a/plainbox/impl/session/restart.py b/plainbox/impl/session/restart.py
472index b3c72cd..a79741b 100644
473--- a/plainbox/impl/session/restart.py
474+++ b/plainbox/impl/session/restart.py
475@@ -255,7 +255,7 @@ class RemoteDebRestartStrategy(RemoteSnappyRestartStrategy):
476 with open(self.session_resume_filename, 'wt') as f:
477 f.write(session_id)
478 os.fsync(f.fileno())
479- if cmd == self.service_name:
480+ if cmd != self.service_name:
481 subprocess.call(['systemctl', 'disable', self.service_name])
482
483

Subscribers

People subscribed via source and target branches