Merge ~kissiel/checkbox-support:lsusb-py-replacement into checkbox-support:master

Proposed by Maciej Kisielewski
Status: Merged
Approved by: Maciej Kisielewski
Approved revision: 0433f1a98e54d3cb9ed7e623677bc51947280f5e
Merged at revision: 2777d43f04cce0f0d42413b05ed4d78153b8fa6e
Proposed branch: ~kissiel/checkbox-support:lsusb-py-replacement
Merge into: checkbox-support:master
Diff against target: 509 lines (+480/-0)
4 files modified
checkbox_support/parsers/sysfs_usb.py (+309/-0)
checkbox_support/parsers/tests/test_sysfs_usb.py (+126/-0)
checkbox_support/scripts/lsusb3.py (+43/-0)
setup.py (+2/-0)
Reviewer Review Type Date Requested Status
Jonathan Cave (community) Approve
Maciej Kisielewski Needs Resubmitting
Review via email: mp+391989@code.launchpad.net

Description of the change

add replacement for old, bad, and difficult to work with lsusb.py

The new one is implemented in python3, provides an easier interface to USB devices and should be much easier to use in our use cases.

I placed a few '# REVIEW' markers in the code where I don't know if my approach was right, and would like +1/-1 on them.

Easiest way to test it is to run:

python3 -c 'import sysfs_usb; print(*[dev.to_str() for dev in sysfs_usb.get_usb_devices()], sep="\n")'

To post a comment you must log in.
Revision history for this message
Jonathan Cave (jocave) wrote :

If you're testing this deployed with checkbox-support the invocation line should be:

python3 -c 'from checkbox_support.parsers import sysfs_usb; print(*[dev.to_str() for dev in sysfs_usb.get_usb_devices()], sep="\n")'

Revision history for this message
Jonathan Cave (jocave) wrote :

See below

review: Needs Fixing
Revision history for this message
Jonathan Cave (jocave) wrote :

For reference this is the output I got when testing on a rpi4b8g UC20 arm64: https://paste.ubuntu.com/p/wzDjRdxSKw/

Revision history for this message
Maciej Kisielewski (kissiel) wrote :

Added an entry point checkbox-support-lsusb3 for testing and comparison. I'll rename it to just checkbox-support-lsusb and I'll delete the previous implementation once MR is approved.

The entry point accepts -s and -f options with same behavior as before. Added -l for future/advanced usage that's already there.

Also tweaked a few things in the output.

review: Needs Resubmitting
Revision history for this message
Maciej Kisielewski (kissiel) wrote :

Added unit tests.

review: Needs Resubmitting
Revision history for this message
Jonathan Cave (jocave) wrote :

Reporting first problem spotted, will continue reading...

review: Needs Fixing
Revision history for this message
Maciej Kisielewski (kissiel) wrote :

Indeed. Fixed in 0433f1a.

review: Needs Resubmitting
Revision history for this message
Jonathan Cave (jocave) wrote :

Great, the rest looks good to me. Thanks for making something we should be able to maintain without too much trouble!

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/checkbox_support/parsers/sysfs_usb.py b/checkbox_support/parsers/sysfs_usb.py
2new file mode 100644
3index 0000000..56fbacd
4--- /dev/null
5+++ b/checkbox_support/parsers/sysfs_usb.py
6@@ -0,0 +1,309 @@
7+# This file is part of Checkbox.
8+#
9+# Copyright 2020 Canonical Ltd.
10+# Written by:
11+# Maciej Kisielewski <maciej.kisielewski@canonical.com>
12+#
13+# Checkbox is free software: you can redistribute it and/or modify
14+# it under the terms of the GNU General Public License version 3,
15+# as published by the Free Software Foundation.
16+#
17+# Checkbox is distributed in the hope that it will be useful,
18+# but WITHOUT ANY WARRANTY; without even the implied warranty of
19+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20+# GNU General Public License for more details.
21+#
22+# You should have received a copy of the GNU General Public License
23+# along with Checkbox. If not, see <http://www.gnu.org/licenses/>.
24+
25+"""
26+This module provides information about USB devices available in the system
27+based on the data available in sysfs.
28+"""
29+import contextlib
30+import glob
31+import os
32+import re
33+import string
34+
35+from collections import OrderedDict
36+from enum import Enum
37+from functools import partial
38+
39+
40+def ishex(chars):
41+ """Checks if all `chars` are hexdigits [0-9a-f]."""
42+ return all([x in string.hexdigits for x in chars])
43+
44+
45+class UsbIds:
46+ """USB IDs database reference."""
47+
48+ def decode_vendor(self, vid):
49+ """Translate vendor ID to a Vendor Name."""
50+ return self._vendors[vid]
51+
52+ def decode_product(self, vid, pid):
53+ """Transate vendor ID and product ID to a device name."""
54+ return '{} {}'.format(
55+ self._vendors[vid], self._products[vid, pid])
56+
57+ def decode_protocol(self, cid, scid, prid):
58+ """
59+ Translate interface class protocol from IDs to a human-readable name.
60+
61+ There is a cascade of fallbacks if some of the more IDs is not known.
62+ See implementation for details.
63+ """
64+ return (
65+ self._protocols.get((cid, scid, prid)) or
66+ self._subclasses.get((cid, scid)) or
67+ self._classes.get(cid) or
68+ ''
69+ )
70+
71+ def __init__(self, usb_ids_path=None):
72+ self._vendors = OrderedDict()
73+ self._products = OrderedDict()
74+ self._classes = OrderedDict()
75+ self._subclasses = OrderedDict()
76+ self._protocols = OrderedDict()
77+ if usb_ids_path:
78+ paths = [usb_ids_path]
79+ else:
80+ paths = [
81+ # focal, bionic, xenial, and debian(s)
82+ '/var/lib/usbutils/usb.ids',
83+ # fallback - used in kernel maintainer's repos
84+ '/usr/share/usb.ids',
85+ ]
86+ for path in paths:
87+ if os.path.isfile(path):
88+ # at the time of writing this the usb_ids has one line that
89+ # uses character from beyond standard ascii 7-bit set. namely
90+ # 0xb4 (Accent Acute). I couldn't find information about the
91+ # file's encoding, but iso match it nicely. The other way to
92+ # cover for those cases would be to use surrogates and then
93+ # have some pass to interpret them.
94+ with open(path, 'rt', encoding='iso8859') as usb_ids_file:
95+ self._parse_usb_ids(usb_ids_file.read())
96+
97+ def _parse_usb_ids(self, content):
98+ """Parse the contents of usb.ids file."""
99+
100+ class ParserContext(Enum):
101+ """Context marker for usb.ids parser."""
102+ DEFAULT = 0
103+ VENDOR = 1
104+ CLASS = 2
105+ OTHER = 3
106+
107+ context = ParserContext.DEFAULT
108+
109+ for line in content.splitlines():
110+ if not line or line[0] == '#':
111+ # empty line or a comment
112+ continue
113+ if ishex(line[:4]):
114+ # vendor information
115+ vid = int(line[:4], 16)
116+ self._vendors[vid] = line[6:]
117+ context = ParserContext.VENDOR
118+ continue
119+ if line[0] == '\t' and ishex(line[1:3]):
120+ # classes use only 2 hex digits, devices use 4
121+ if context == ParserContext.VENDOR:
122+ pid = int(line[1:5], 16)
123+ # last added vendor is the current vendor
124+ vid = list(self._vendors.keys())[-1]
125+ self._products[vid, pid] = line[7:]
126+ continue
127+ if context == ParserContext.CLASS:
128+ subclass_id = int(line[1:5], 16)
129+ name = line[5:]
130+ class_id = list(self._classes.keys())[-1]
131+ description = "{}:{}".format(
132+ self._classes[class_id],
133+ name if name != 'Unused' else '')
134+ self._subclasses[class_id, subclass_id] = description
135+ continue
136+ if line[0] == 'C':
137+ context = ParserContext.CLASS
138+ class_id = int(line[2:4], 16)
139+ class_name = line[6:]
140+ self._classes[class_id] = class_name
141+ continue
142+ if line[0:2] == '\t\t' and ishex(line[2:4]):
143+ protocol_id = int(line[2:4], 16)
144+ class_id, subclass_id = list(self._subclasses.keys())[-1]
145+ description = "{}:{}".format(
146+ self._subclasses[class_id, subclass_id], line[6:])
147+ self._protocols[class_id, subclass_id, protocol_id] = (
148+ description)
149+ continue
150+ # if we got here without satisfying any of the above ifs
151+ # then we need to set paraser into a state where lines won't be
152+ # consumed
153+ context = ParserContext.OTHER
154+
155+
156+def read_entry(sysfs_path, field):
157+ """Read a sysfs attribute."""
158+ with open(os.path.join(sysfs_path, field), 'rt') as fentry:
159+ return fentry.readline().strip('\n')
160+
161+
162+# REVIEW : the next two classes share a lot of functionality, I kept them
163+# separate as there are just two and it helps with readibility
164+class UsbInterface(dict):
165+ """
166+ A proxy to sysfs entry for a USB Interface.
167+ """
168+ def __init__(self, sysfs_path, usb_ids, parent):
169+ super().__init__(self)
170+ self.sysfs_path = sysfs_path
171+ self._parent = parent
172+ self._level = 0
173+ while parent:
174+ self._level += 1
175+ parent = parent.parent
176+ # REVIEW : If this is too 'hucky' then I could just go with explicit
177+ # assigment of the fields. May be more readible that way.
178+ hex_int_fields = [
179+ 'bInterfaceClass', 'bInterfaceSubClass', 'bInterfaceProtocol']
180+ for field in hex_int_fields:
181+ self[field] = int(read_entry(sysfs_path, field), 16)
182+ self['bNumEndpoints'] = int(read_entry(sysfs_path, 'bNumEndpoints'))
183+ self['driver'] = ''
184+ self['name'] = ''
185+ with contextlib.suppress(Exception):
186+ self['driver'] = os.path.basename(
187+ os.readlink(os.path.join(sysfs_path, 'driver')))
188+ self['protocol_name'] = usb_ids.decode_protocol(
189+ self['bInterfaceClass'], self['bInterfaceSubClass'],
190+ self['bInterfaceProtocol'])
191+
192+ # REVIEW: I was tempted to overload __str__, but I went with an explicit
193+ # function.
194+ def to_str(self):
195+ """Generate a string representation of this Interface."""
196+ template = (
197+ '{padded_name:16}(IF) {bInterfaceClass:02x}:'
198+ '{bInterfaceSubClass:02x}:{bInterfaceProtocol:02x} '
199+ '{bNumEndpoints}EPs ({protocol_name}) {driver} {name}'
200+ )
201+ padded_name = ' ' * self._level + os.path.basename(self.sysfs_path)
202+ half_done = partial(template.format, padded_name=padded_name)
203+ line = half_done(**self)
204+ return line
205+
206+
207+class UsbDevice(dict):
208+ """
209+ A proxy to sysfs entry for a device.
210+
211+ Attributes can be read as dictionary keys.
212+ Sub-devices are available from the `children` property.
213+ """
214+ def __init__(self, sysfs_path, usb_ids, parent=None):
215+ super().__init__(self)
216+ self.sysfs_path = sysfs_path
217+ self.parent = parent
218+ self._level = 0
219+ while parent:
220+ self._level += 1
221+ parent = parent.parent
222+ self.children = []
223+ self.interfaces = []
224+ hex_int_fields = [
225+ 'bDeviceClass', 'bDeviceSubClass', 'bDeviceProtocol',
226+ 'idVendor', 'idProduct',
227+ ]
228+ for field in hex_int_fields:
229+ self[field] = int(read_entry(sysfs_path, field), 16)
230+ int_fields = ['maxchild', 'bNumInterfaces', 'busnum', 'devnum']
231+ for field in int_fields:
232+ self[field] = int(read_entry(sysfs_path, field))
233+ str_fields = ['version', 'speed', 'bMaxPower']
234+ for field in str_fields:
235+ self[field] = read_entry(sysfs_path, field)
236+ # let's try getting the name directly from sysfs entries
237+ self['name'] = ''
238+ with contextlib.suppress(Exception):
239+ # any of the next three attributes may be missing, so let's try
240+ # going one by one. If an exception is raised while getting any
241+ # part the previous parts will be already stored in self['name']
242+ self['name'] = read_entry(sysfs_path, 'manufacturer')
243+ self['name'] += ' ' + read_entry(sysfs_path, 'product')
244+ self['name'] += ' ' + read_entry(sysfs_path, 'serial')
245+ # for HCI host controller entry we may want to trim the name bit
246+ if self['name'].startswith('Linux'):
247+ regex = r"Linux [^ ]* .hci[-_]hcd"
248+ if re.search(regex, self['name']):
249+ self['name'] = "Linux Foundation {:.2f} root hub".format(
250+ float(self['version']))
251+ # if nothing got read from sysfs we need to consult the USB IDS DB
252+ if not self['name']:
253+ with contextlib.suppress(Exception):
254+ self['name'] = usb_ids.decode_product(
255+ self['idVendor'], self['idProduct'])
256+ if not self['name']:
257+ # last change, try just the vendor name
258+ with contextlib.suppress(Exception):
259+ self['name'] = usb_ids.decode_vendor(self['idVendor'])
260+ # now onto children, some of them are real usb devices, and some are just
261+ # interfaces that the current device implements
262+ for node in os.listdir(sysfs_path):
263+ if not node[0].isdigit():
264+ continue
265+ sub_path = os.path.join(sysfs_path, node)
266+ if os.path.exists(os.path.join(sub_path, 'bInterfaceClass')):
267+ # interface information
268+ self.interfaces.append(UsbInterface(sub_path, usb_ids, self))
269+ else:
270+ # 'real' USB device
271+ self.children.append(UsbDevice(sub_path, usb_ids, self))
272+
273+ def to_str(self):
274+ """Generate a string representation of this USB Device."""
275+ template = (
276+ '{padded_name:16}{idVendor:04x}:{idProduct:04x} '
277+ '{bDeviceClass:02x} {version} {speed:3}MBit/s {bMaxPower} '
278+ '{bNumInterfaces}IFs ({name})'
279+ )
280+ padded_name = ' ' * self._level + os.path.basename(self.sysfs_path)
281+ half_done = partial(template.format, padded_name=padded_name)
282+ line = half_done(**self)
283+ children_strs = [c.to_str() for c in self.children]
284+ ifaces_strs = [i.to_str() for i in self.interfaces]
285+ return '\n'.join([line] + children_strs + ifaces_strs)
286+
287+ def to_short_str(self):
288+ """Generate a short string representation of this USB Device."""
289+ template = 'ID {idVendor:04x}:{idProduct:04x} {name}'
290+ return '\n'.join([template.format(**self)] + [
291+ c.to_short_str() for c in self.children])
292+
293+ def to_legacy_str(self):
294+ """
295+ Generate a string representation similar to early versions of lsusb.py
296+ written for python2.
297+ """
298+ template = (
299+ 'Bus {busnum:03} Device {devnum:03} '
300+ 'ID {idVendor:04x}:{idProduct:04x} {name}'
301+ )
302+ return '\n'.join([template.format(**self)] + [
303+ c.to_legacy_str() for c in self.children])
304+
305+
306+def get_usb_devices(usb_ids=None):
307+ """
308+ Get dict-like objects representing USB devices.
309+
310+ `usb_ids` argument should be an instance to UsbIds object. If not supplied
311+ one with default settings will be created.
312+ """
313+ usb_ids = usb_ids or UsbIds()
314+ for node in glob.glob("/sys/bus/usb/devices/usb*"):
315+ yield UsbDevice(node, usb_ids)
316diff --git a/checkbox_support/parsers/tests/test_sysfs_usb.py b/checkbox_support/parsers/tests/test_sysfs_usb.py
317new file mode 100644
318index 0000000..2ef681e
319--- /dev/null
320+++ b/checkbox_support/parsers/tests/test_sysfs_usb.py
321@@ -0,0 +1,126 @@
322+# This file is part of Checkbox.
323+#
324+# Copyright 2020 Canonical Ltd.
325+# Written by:
326+# Maciej Kisielewski <maciej.kisielewski@canonical.com>
327+#
328+# Checkbox is free software: you can redistribute it and/or modify
329+# it under the terms of the GNU General Public License version 3,
330+# as published by the Free Software Foundation.
331+#
332+# Checkbox is distributed in the hope that it will be useful,
333+# but WITHOUT ANY WARRANTY; without even the implied warranty of
334+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
335+# GNU General Public License for more details.
336+#
337+# You should have received a copy of the GNU General Public License
338+# along with Checkbox. If not, see <http://www.gnu.org/licenses/>.
339+
340+
341+"""Tests for the sysfs_usb module."""
342+import textwrap
343+
344+from unittest import TestCase
345+from unittest.mock import mock_open, patch
346+
347+from checkbox_support.parsers.sysfs_usb import ishex
348+from checkbox_support.parsers.sysfs_usb import UsbIds
349+
350+
351+class TestIsHex(TestCase):
352+ """Tests for the is_hex function."""
353+ def test_good(self):
354+ """Test actual hexes."""
355+ self.assertTrue(ishex('1'))
356+ self.assertTrue(ishex('a'))
357+ self.assertTrue(ishex('5b'))
358+ self.assertTrue(ishex('1234567890abcdef'))
359+
360+ def test_empty(self):
361+ """Test empty string."""
362+ self.assertTrue(ishex(''))
363+
364+ def test_failing(self):
365+ """Test non-hexes."""
366+ self.assertFalse(ishex('g'))
367+ self.assertFalse(ishex('5x'))
368+
369+ def test_hex_literal(self):
370+ """Test hex literal. It uses non-hex char - 'x'."""
371+ self.assertFalse(ishex('0xff'))
372+
373+ def test_bad_type(self):
374+ """Test non string arguments."""
375+ with self.assertRaises(TypeError):
376+ self.assertFalse(ishex(0xff))
377+ with self.assertRaises(TypeError):
378+ self.assertFalse(ishex(True))
379+
380+
381+class TestUsbIds(TestCase):
382+ """Test for the UsbIds class."""
383+ def test_empty(self):
384+ """Test empty database."""
385+ mopen = mock_open(read_data='')
386+ with patch('checkbox_support.parsers.sysfs_usb.open', mopen):
387+ ids = UsbIds()
388+ with self.assertRaises(KeyError):
389+ ids.decode_product(42, 42)
390+ self.assertEqual(ids.decode_protocol(42, 42, 42), '')
391+
392+ def test_full_product(self):
393+ """Test good entry."""
394+ usb_ids_content = textwrap.dedent("""
395+ 0042 ACME
396+ \t0042 Seafourium
397+ """)
398+ mopen = mock_open(read_data=usb_ids_content)
399+ with patch('checkbox_support.parsers.sysfs_usb.open', mopen):
400+ ids = UsbIds()
401+ self.assertEqual(ids.decode_product(0x42, 0x42), 'ACME Seafourium')
402+
403+ def test_vendor_only(self):
404+ """Test entry with vendor only."""
405+ usb_ids_content = textwrap.dedent("""
406+ 0042 ACME
407+ """)
408+ mopen = mock_open(read_data=usb_ids_content)
409+ with patch('checkbox_support.parsers.sysfs_usb.open', mopen):
410+ ids = UsbIds()
411+ self.assertEqual(ids.decode_vendor(0x42), 'ACME')
412+
413+ def test_full_protocol(self):
414+ """Test full protocol triplet."""
415+ usb_ids_content = textwrap.dedent("""
416+ C 42 Explosives
417+ \t06 Bomb
418+ \t\t01 Boom
419+ """)
420+ mopen = mock_open(read_data=usb_ids_content)
421+ with patch('checkbox_support.parsers.sysfs_usb.open', mopen):
422+ ids = UsbIds()
423+ self.assertEqual(ids.decode_protocol(0x42, 0x06, 0x01),
424+ 'Explosives:Bomb:Boom')
425+
426+ def test_class_and_subclass_only(self):
427+ """Test fallback to cid and scid."""
428+ usb_ids_content = textwrap.dedent("""
429+ C 42 Explosives
430+ \t06 Bomb
431+ """)
432+ mopen = mock_open(read_data=usb_ids_content)
433+ with patch('checkbox_support.parsers.sysfs_usb.open', mopen):
434+ ids = UsbIds()
435+ self.assertEqual(ids.decode_protocol(0x42, 0x06, 0x01),
436+ 'Explosives:Bomb')
437+
438+ def test_class_only(self):
439+ """Test fallback to cid."""
440+ usb_ids_content = textwrap.dedent("""
441+ C 42 Explosives
442+ """)
443+ mopen = mock_open(read_data=usb_ids_content)
444+ with patch('checkbox_support.parsers.sysfs_usb.open', mopen):
445+ ids = UsbIds()
446+ self.assertEqual(ids.decode_protocol(0x42, 0x06, 0x01),
447+ 'Explosives')
448diff --git a/checkbox_support/scripts/lsusb3.py b/checkbox_support/scripts/lsusb3.py
449new file mode 100644
450index 0000000..a2d4677
451--- /dev/null
452+++ b/checkbox_support/scripts/lsusb3.py
453@@ -0,0 +1,43 @@
454+# This file is part of Checkbox.
455+#
456+# Copyright 2020 Canonical Ltd.
457+# Written by:
458+# Maciej Kisielewski <maciej.kisielewski@canonical.com>
459+#
460+# Checkbox is free software: you can redistribute it and/or modify
461+# it under the terms of the GNU General Public License version 3,
462+# as published by the Free Software Foundation.
463+#
464+# Checkbox is distributed in the hope that it will be useful,
465+# but WITHOUT ANY WARRANTY; without even the implied warranty of
466+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
467+# GNU General Public License for more details.
468+#
469+# You should have received a copy of the GNU General Public License
470+# along with Checkbox. If not, see <http://www.gnu.org/licenses/>.
471+
472+import argparse
473+from checkbox_support.parsers import sysfs_usb
474+
475+def main():
476+ parser = argparse.ArgumentParser()
477+ parser.add_argument(
478+ '-s', '--short', action='store_true',
479+ help="Print output in a short form")
480+ parser.add_argument(
481+ '-l', '--long', action='store_true',
482+ help="Use the new output format")
483+ parser.add_argument('-f', '--file', help="Path to the usb.ids file")
484+ args = parser.parse_args()
485+
486+ usb_ids = sysfs_usb.UsbIds(args.file)
487+ for dev in sysfs_usb.get_usb_devices(usb_ids):
488+ if args.short:
489+ print(dev.to_short_str())
490+ elif args.long:
491+ print(dev.to_str())
492+ else:
493+ print(dev.to_legacy_str())
494+
495+if __name__ == '__main__':
496+ main()
497diff --git a/setup.py b/setup.py
498index 75dbc11..d5d4a59 100755
499--- a/setup.py
500+++ b/setup.py
501@@ -93,6 +93,8 @@ setup(
502 "checkbox_support.scripts.eddystone_scanner:main"),
503 ("checkbox-support-lsusb="
504 "checkbox_support.scripts.lsusb:main"),
505+ ("checkbox-support-lsusb3="
506+ "checkbox_support.scripts.lsusb3:main"),
507 ],
508 },
509 )

Subscribers

People subscribed via source and target branches