Merge ~sylvain-pineau/checkbox-support:eddystone-scanner into checkbox-support:master

Proposed by Sylvain Pineau
Status: Merged
Approved by: Sylvain Pineau
Approved revision: 6b02a8bfe37b28f69054617a78da4866006047d1
Merged at revision: 52937e937a410bc379b56b8b5f4d5f554d381a68
Proposed branch: ~sylvain-pineau/checkbox-support:eddystone-scanner
Merge into: checkbox-support:master
Diff against target: 1794 lines (+1748/-0)
7 files modified
checkbox_support/scripts/eddystone_scanner.py (+87/-0)
checkbox_support/vendor/__init__.py (+0/-0)
checkbox_support/vendor/aioblescan/LICENSE.txt (+20/-0)
checkbox_support/vendor/aioblescan/__init__.py (+2/-0)
checkbox_support/vendor/aioblescan/aioblescan.py (+1275/-0)
checkbox_support/vendor/aioblescan/eddystone.py (+362/-0)
setup.py (+2/-0)
Reviewer Review Type Date Requested Status
Sylvain Pineau (community) Approve
Review via email: mp+356229@code.launchpad.net

Description of the change

New console script to scan for Eddystone URL beacon advertisements.

To post a comment you must log in.
Revision history for this message
Sylvain Pineau (sylvain-pineau) wrote :

Tested last week on a large set of devices used to run SRU tests in cert lab and devices running UC.

Self-approved

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
diff --git a/checkbox_support/scripts/eddystone_scanner.py b/checkbox_support/scripts/eddystone_scanner.py
0new file mode 1006440new file mode 100644
index 0000000..3ee398a
--- /dev/null
+++ b/checkbox_support/scripts/eddystone_scanner.py
@@ -0,0 +1,87 @@
1#!/usr/bin/env python3
2# encoding: UTF-8
3# Copyright (c) 2018 Canonical Ltd.
4#
5# Authors:
6# Sylvain Pineau <sylvain.pineau@canonical.com>
7#
8# This program is free software: you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation, either version 3 of the License, or
11# (at your option) any later version.
12#
13# This program is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the GNU General Public License
19# along with this program. If not, see <http://www.gnu.org/licenses/>.
20
21import argparse
22import asyncio
23
24from checkbox_support.vendor.aioblescan import create_bt_socket
25from checkbox_support.vendor.aioblescan import BLEScanRequester
26from checkbox_support.vendor.aioblescan import HCI_Cmd_LE_Advertise
27from checkbox_support.vendor.aioblescan import HCI_Event
28from checkbox_support.vendor.aioblescan.eddystone import EddyStone
29
30
31def main():
32 parser = argparse.ArgumentParser(
33 description="Track BLE advertised packets")
34 parser.add_argument("-D", "--device", default='hci0',
35 help="Select the hciX device to use "
36 "(default hci0).")
37
38 async def timeout():
39 await asyncio.sleep(10.0)
40
41 def ble_process(data):
42 ev = HCI_Event()
43 ev.decode(data)
44 advertisement = EddyStone().decode(ev)
45 if advertisement:
46 print("EddyStone URL: {}".format(advertisement['url']))
47 for task in asyncio.Task.all_tasks():
48 task.cancel()
49
50 try:
51 opts = parser.parse_args()
52 except Exception as e:
53 parser.error("Error: " + str(e))
54 return 1
55 event_loop = asyncio.get_event_loop()
56 # First create and configure a STREAM socket
57 try:
58 mysocket = create_bt_socket(int(opts.device.replace('hci', '')))
59 except OSError as e:
60 print(e)
61 return 1
62 # Create a connection with the STREAM socket
63 fac = event_loop._create_connection_transport(
64 mysocket, BLEScanRequester, None, None)
65 # Start it
66 conn, btctrl = event_loop.run_until_complete(fac)
67 # Attach processing
68 btctrl.process = ble_process
69 # Probe
70 btctrl.send_scan_request()
71 try:
72 event_loop.run_until_complete(timeout())
73 return 1
74 except asyncio.CancelledError:
75 return 0
76 except KeyboardInterrupt:
77 return 1
78 finally:
79 btctrl.stop_scan_request()
80 command = HCI_Cmd_LE_Advertise(enable=False)
81 btctrl.send_command(command)
82 conn.close()
83 event_loop.close()
84
85
86if __name__ == '__main__':
87 raise SystemExit(main())
diff --git a/checkbox_support/vendor/__init__.py b/checkbox_support/vendor/__init__.py
0new file mode 10064488new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/checkbox_support/vendor/__init__.py
diff --git a/checkbox_support/vendor/aioblescan/LICENSE.txt b/checkbox_support/vendor/aioblescan/LICENSE.txt
1new file mode 10064489new file mode 100644
index 0000000..cb44740
--- /dev/null
+++ b/checkbox_support/vendor/aioblescan/LICENSE.txt
@@ -0,0 +1,20 @@
1The MIT License (MIT)
2
3Copyright © 2017 François Wautier
4
5Permission is hereby granted, free of charge, to any person obtaining a copy of this
6oftware and associated documentation files (the "Software"), to deal in the Software
7without restriction, including without limitation the rights to use, copy, modify,
8merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
9permit persons to whom the Software is furnished to do so, subject to the following
10conditions:
11
12The above copyright notice and this permission notice shall be included in all copies
13or substantial portions of the Software.
14
15THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
16INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
17PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE
18FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT ORxi
19OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20DEALINGS IN THE SOFTWARE.
diff --git a/checkbox_support/vendor/aioblescan/__init__.py b/checkbox_support/vendor/aioblescan/__init__.py
0new file mode 10064421new file mode 100644
index 0000000..5fdf345
--- /dev/null
+++ b/checkbox_support/vendor/aioblescan/__init__.py
@@ -0,0 +1,2 @@
1from .aioblescan import *
2__version__ = '0.2.1'
diff --git a/checkbox_support/vendor/aioblescan/aioblescan.py b/checkbox_support/vendor/aioblescan/aioblescan.py
0new file mode 1006443new file mode 100644
index 0000000..b0d9d0d
--- /dev/null
+++ b/checkbox_support/vendor/aioblescan/aioblescan.py
@@ -0,0 +1,1275 @@
1#!/usr/bin/env python3
2# -*- coding:utf-8 -*-
3#
4# This application is simply a python only Bluetooth LE Scan command with
5# decoding of advertised packets
6#
7# Copyright (c) 2017 François Wautier
8#
9# Note large part of this code was taken from scapy and other opensource software
10#
11# Permission is hereby granted, free of charge, to any person obtaining a copy
12# of this software and associated documentation files (the "Software"), to deal
13# in the Software without restriction, including without limitation the rights
14# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
15# of the Software, and to permit persons to whom the Software is furnished to do so,
16# subject to the following conditions:
17#
18# The above copyright notice and this permission notice shall be included in all copies
19# or substantial portions of the Software.
20#
21# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
25# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
26# IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE
27
28import socket, asyncio, sys
29from struct import pack, unpack, calcsize
30
31
32#A little bit of HCI
33HCI_COMMAND = 0x01
34HCI_ACL_DATA = 0x02
35HCI_SCO_DATA = 0x03
36HCI_EVENT = 0x04
37HCI_VENDOR = 0x05
38
39PRINT_INDENT=" "
40
41CMD_SCAN_REQUEST = 0x200c #mixing the OGF in with that HCI shift
42
43#
44EDDY_UUID=b"\xfe\xaa" #Google UUID
45
46#Generated from https://www.uuidgenerator.net/ 906ed6ab-6785-4eab-9847-bf9889c098ae alternative is 668997f8-4acd-48ea-b35b-749e54215860
47MY_UUID = b'\x90\x6e\xd6\xab\x67\x85\x4e\xab\x98\x47\xbf\x98\x89\xc0\x98\xae'
48#MY_UUID = b'\x66\x89\x97\xf8\x4a\xcd\x48\xea\xb3\x5b\x74\x9e\x54\x21\x58\x60'
49#
50# Let's define some useful types
51#
52class MACAddr:
53 """Class representing a MAC address.
54
55 :param name: The name of the instance
56 :type name: str
57 :param mac: the mac address.
58 :type mac: str
59 :returns: MACAddr instance.
60 :rtype: MACAddr
61
62 """
63 def __init__(self,name,mac="00:00:00:00:00:00"):
64 self.name = name
65 self.val=mac.lower()
66
67 def encode (self):
68 """Encode the MAC address to a byte array.
69
70 :returns: The encoded version of the MAC address
71 :rtype: bytes
72 """
73 return int(self.val.replace(":",""),16).to_bytes(6,"little")
74
75 def decode(self,data):
76 """Decode the MAC address from a byte array.
77
78 This will take the first 6 bytes from data and transform them into a MAC address
79 string representation. This will be assigned to the attribute "val". It then returns
80 the data stream minus the bytes consumed
81
82 :param data: The data stream containing the value to decode at its head
83 :type data: bytes
84 :returns: The datastream minus the bytes consumed
85 :rtype: bytes
86 """
87 self.val=':'.join("%02x" % x for x in reversed(data[:6]))
88 return data[6:]
89
90 def __len__(self):
91 return 6
92
93 def show(self,depth=0):
94 print("{}{}:".format(PRINT_INDENT*depth,self.name))
95 print("{}{}".format(PRINT_INDENT*(depth+1),self.val))
96
97class Bool:
98 """Class representing a boolean value.
99
100 :param name: The name of the instance
101 :type name: str
102 :param val: the boolean value.
103 :type mac: bool
104 :returns: Bool instance.
105 :rtype: Bool
106
107 """
108 def __init__(self,name,val=True):
109 self.name=name
110 self.val=val
111
112 def encode (self):
113 val=(self.val and b'\x01') or b'\x00'
114 return val
115
116 def decode(self,data):
117 self.val= data[:1]==b"\x01"
118 return data[1:]
119
120 def __len__(self):
121 return 1
122
123 def show(self,depth=0):
124 print("{}{}:".format(PRINT_INDENT*depth,self.name))
125 print("{}{}".format(PRINT_INDENT*(depth+1),self.val))
126
127class Byte:
128 """Class representing a single byte value.
129
130 :param name: The name of the instance
131 :type name: str
132 :param val: the single byte value.
133 :type val: byte
134 :returns: Byte instance.
135 :rtype: Byte
136
137 """
138 def __init__(self,name,val=0):
139 self.name=name
140 self.val=val
141
142 def encode (self):
143 val=pack("<c",self.val)
144 return val
145
146 def decode(self,data):
147 self.val= unpack("<c",data[:1])[0]
148 return data[1:]
149
150 def __len__(self):
151 return 1
152
153 def show(self,depth=0):
154 print("{}{}:".format(PRINT_INDENT*depth,self.name))
155 print("{}{}".format(PRINT_INDENT*(depth+1),":".join(map(lambda b: format(b, "02x"), self.val))))
156
157class EnumByte:
158 """Class representing a single byte value from a limited set of value
159
160 :param name: The name of the instance
161 :type name: str
162 :param val: the single byte value.
163 :type val: byte
164 :param loval: the list of possible values.
165 :type loval: dict
166 :returns: EnumByte instance.
167 :rtype: EnumByte
168
169 """
170 def __init__(self,name,val=0,loval={0:"Undef"}):
171 self.name=name
172 self.val=val
173 self.loval=loval
174
175 def encode (self):
176 val=pack(">B",self.val)
177 return val
178
179 def decode(self,data):
180 self.val= unpack(">B",data[:1])[0]
181 return data[1:]
182
183 @property
184 def strval(self):
185 if self.val in self.loval:
186 return self.loval[self.val]
187 else:
188 return str(self.val)
189
190 def __len__(self):
191 return 1
192
193 def show(self,depth=0):
194 print("{}{}:".format(PRINT_INDENT*depth,self.name))
195 if self.val in self.loval:
196 print("{}{}".format(PRINT_INDENT*(depth+1),self.loval[self.val]))
197 else:
198 print("{}Undef".format(PRINT_INDENT*(depth+1)))
199
200class BitFieldByte:
201 """Class representing a single byte value as a bit field.
202
203 :param name: The name of the instance
204 :type name: str
205 :param val: the single byte value.
206 :type val: byte
207 :param loval: the list defining the name of the property represented by each bit.
208 :type loval: list
209 :returns: BitFieldByte instance.
210 :rtype: BitFieldByte
211
212 """
213 def __init__(self,name,val=0,loval=["Undef"]*8):
214 self.name=name
215 self._val=val
216 self.loval=loval
217
218 def encode (self):
219 val=pack(">B",self._val)
220 return val
221
222 def decode(self,data):
223 self._val= unpack(">B",data[:1])[0]
224 return data[1:]
225
226 def __len__(self):
227 return 1
228
229 @property
230 def val(self):
231 resu={}
232 for x in self.loval:
233 if x not in ["Undef","Reserv"]:
234 resu[x]=(self._val & mybit)>0
235 return resu
236
237 def show(self,depth=0):
238 print("{}{}:".format(PRINT_INDENT*depth,self.name))
239 mybit=0x80
240 for x in self.loval:
241 if x not in ["Undef","Reserv"]:
242 print("{}{}: {}".format(PRINT_INDENT*(depth+1),x, ((self._val & mybit) and "True") or False))
243 mybit = mybit >>1
244
245class IntByte:
246 """Class representing a single byte as a signed integer.
247
248 :param name: The name of the instance
249 :type name: str
250 :param val: the integer value.
251 :type val: int
252 :returns: IntByte instance.
253 :rtype: IntByte
254
255 """
256 def __init__(self,name,val=0):
257 self.name=name
258 self.val=val
259
260 def encode (self):
261 val=pack(">b",self.val)
262 return val
263
264 def decode(self,data):
265 self.val= unpack(">b",data[:1])[0]
266 return data[1:]
267
268 def __len__(self):
269 return 1
270
271 def show(self,depth=0):
272 print("{}{}:".format(PRINT_INDENT*depth,self.name))
273 print("{}{}".format(PRINT_INDENT*(depth+1),self.val))
274
275class UIntByte:
276 """Class representing a single byte as an unsigned integer.
277
278 :param name: The name of the instance
279 :type name: str
280 :param val: the integer value.
281 :type val: int
282 :returns: UIntByte instance.
283 :rtype: UIntByte
284
285 """
286 def __init__(self,name,val=0):
287 self.name=name
288 self.val=val
289
290 def encode (self):
291 val=pack(">B",self.val)
292 return val
293
294 def decode(self,data):
295 self.val= unpack(">B",data[:1])[0]
296 return data[1:]
297
298 def __len__(self):
299 return 1
300
301 def show(self,depth=0):
302 print("{}{}:".format(PRINT_INDENT*depth,self.name))
303 print("{}{}".format(PRINT_INDENT*(depth+1),self.val))
304
305class ShortInt:
306 """Class representing 2 bytes as a signed integer.
307
308 :param name: The name of the instance
309 :type name: str
310 :param val: the integer value.
311 :type val: int
312 :param endian: Endianess of the bytes. "big" or no "big" (i.e. "little")
313 :type endian: str
314 :returns: ShortInt instance.
315 :rtype: ShortInt
316
317 """
318 def __init__(self,name,val=0,endian="big"):
319 self.name=name
320 self.val=val
321 self.endian = endian
322
323 def encode (self):
324 if self.endian == "big":
325 val=pack(">h",self.val)
326 else:
327 val=pack("<h",self.val)
328 return val
329
330 def decode(self,data):
331 if self.endian == "big":
332 self.val= unpack(">h",data[:2])[0]
333 else:
334 self.val= unpack("<h",data[:2])[0]
335 return data[2:]
336
337 def __len__(self):
338 return 2
339
340 def show(self,depth=0):
341 print("{}{}:".format(PRINT_INDENT*depth,self.name))
342 print("{}{}".format(PRINT_INDENT*(depth+1),self.val))
343
344class UShortInt:
345 """Class representing 2 bytes as an unsigned integer.
346
347 :param name: The name of the instance
348 :type name: str
349 :param val: the integer value.
350 :type val: int
351 :param endian: Endianess of the bytes. "big" or no "big" (i.e. "little")
352 :type endian: str
353 :returns: UShortInt instance.
354 :rtype: UShortInt
355
356 """
357 def __init__(self,name,val=0,endian="big"):
358 self.name=name
359 self.val=val
360 self.endian = endian
361
362 def encode (self):
363 if self.endian == "big":
364 val=pack(">H",self.val)
365 else:
366 val=pack("<H",self.val)
367 return val
368
369 def decode(self,data):
370 if self.endian == "big":
371 self.val= unpack(">H",data[:2])[0]
372 else:
373 self.val= unpack("<H",data[:2])[0]
374 return data[2:]
375
376 def __len__(self):
377 return 2
378
379 def show(self,depth=0):
380 print("{}{}:".format(PRINT_INDENT*depth,self.name))
381 print("{}{}".format(PRINT_INDENT*(depth+1),self.val))
382
383class LongInt:
384 """Class representing 4 bytes as a signed integer.
385
386 :param name: The name of the instance
387 :type name: str
388 :param val: the integer value.
389 :type val: int
390 :param endian: Endianess of the bytes. "big" or no "big" (i.e. "little")
391 :type endian: str
392 :returns: LongInt instance.
393 :rtype: LongInt
394
395 """
396 def __init__(self,name,val=0,endian="big"):
397 self.name=name
398 self.val=val
399 self.endian = endian
400
401 def encode (self):
402 if self.endian == "big":
403 val=pack(">l",self.val)
404 else:
405 val=pack("<l",self.val)
406 return val
407
408 def decode(self,data):
409 if self.endian == "big":
410 self.val= unpack(">l",data[:4])[0]
411 else:
412 self.val= unpack("<l",data[:4])[0]
413 return data[4:]
414
415 def __len__(self):
416 return 4
417
418 def show(self,depth=0):
419 print("{}{}:".format(PRINT_INDENT*depth,self.name))
420 print("{}{}".format(PRINT_INDENT*(depth+1),self.val))
421
422class ULongInt:
423 """Class representing 4 bytes as an unsigned integer.
424
425 :param name: The name of the instance
426 :type name: str
427 :param val: the integer value.
428 :type val: int
429 :param endian: Endianess of the bytes. "big" or no "big" (i.e. "little")
430 :type endian: str
431 :returns: ULongInt instance.
432 :rtype: ULongInt
433
434 """
435 def __init__(self,name,val=0,endian="big"):
436 self.name=name
437 self.val=val
438 self.endian = endian
439
440 def encode (self):
441 if self.endian == "big":
442 val=pack(">L",self.val)
443 else:
444 val=pack("<L",self.val)
445 return val
446
447 def decode(self,data):
448 if self.endian == "big":
449 self.val= unpack(">L",data[:4])[0]
450 else:
451 self.val= unpack("<L",data[:4])[0]
452 return data[4:]
453
454 def __len__(self):
455 return 4
456
457 def show(self,depth=0):
458 print("{}{}:".format(PRINT_INDENT*depth,self.name))
459 print("{}{}".format(PRINT_INDENT*(depth+1),self.val))
460
461class OgfOcf:
462 """Class representing the 2 bytes that specify the command in an HCI command packet.
463
464 :param name: The name of the instance
465 :type name: str
466 :param ogf: the Op-code Group (6 bits).
467 :type ogf: bytes
468 :param ocf: the Op-code Command (10 bits).
469 :type ocf: bytes
470 :returns: OgfOcf instance.
471 :rtype: OgfOcf
472
473 """
474 def __init__(self,name,ogf=b"\x00",ocf=b"\x00"):
475 self.name=name
476 self.ogf= ogf
477 self.ocf= ocf
478
479 def encode (self):
480 val=pack("<H",(ord(self.ogf) << 10) | ord(self.ocf))
481 return val
482
483 def decode(self,data):
484 val = unpack("<H",data[:len(self)])[0]
485 self.ogf =val>>10
486 self.ocf = int(val - (self.ogf<<10)).to_bytes(1,"big")
487 self.ogf = int(self.ogf).to_bytes(1,"big")
488 return data[len(self):]
489
490 def __len__(self):
491 return calcsize("<H")
492
493 def show(self,depth=0):
494 print("{}Cmd Group:".format(PRINT_INDENT*depth))
495 print("{}{}".format(PRINT_INDENT*(depth+1),self.ogf))
496 print("{}Cmd Code:".format(PRINT_INDENT*depth))
497 print("{}{}".format(PRINT_INDENT*(depth+1),self.ocf))
498
499class Itself:
500 """Class representing a byte array that need no manipulation.
501
502 :param name: The name of the instance
503 :type name: str
504 :returns: Itself instance.
505 :rtype: Itself
506
507 """
508 def __init__(self,name):
509 self.name=name
510 self.val=b""
511
512 def encode(self):
513 val=pack(">%ds"%len(self.val),self.val)
514 return val
515
516 def decode(self,data):
517 self.val=unpack(">%ds"%len(data),data)[0]
518 return b""
519
520 def __len__(self):
521 return len(self.val)
522
523 def show(self,depth=0):
524 print("{}{}:".format(PRINT_INDENT*depth,self.name))
525 print("{}{}".format(PRINT_INDENT*(depth+1),":".join(map(lambda b: format(b, "02x"), self.val))))
526
527class String:
528 """Class representing a string.
529
530 :param name: The name of the instance
531 :type name: str
532 :returns: String instance.
533 :rtype: String
534
535 """
536 def __init__(self,name):
537 self.name=name
538 self.val=""
539
540 def encode(self):
541 if isinstance(self.val,str):
542 self.val = self.val.encode()
543 val=pack(">%ds"%len(self.val),self.val)
544 return val
545
546 def decode(self,data):
547 self.val=data
548 return b""
549
550 def __len__(self):
551 return len(self.val)
552
553 def show(self,depth=0):
554 print("{}{}:".format(PRINT_INDENT*depth,self.name))
555 print("{}{}".format(PRINT_INDENT*(depth+1),self.val))
556
557
558class NBytes:
559 """Class representing a byte string.
560
561 :param name: The name of the instance
562 :type name: str
563 :param length: The length
564 :type length: int
565 :returns: NBytes instance.
566 :rtype: NBytes
567
568 """
569 def __init__(self,name,length=2):
570 self.name=name
571 self.length=length
572 self.val=b""
573
574 def encode(self):
575 val=pack(">%ds"%len(self.length),self.val)
576 return val
577
578 def decode(self,data):
579 self.val=unpack(">%ds"%self.length,data[:self.length])[0][::-1]
580 return data[self.length:]
581
582 def __len__(self):
583 return self.length
584
585 def show(self,depth=0):
586 if self.name:
587 print("{}{}:".format(PRINT_INDENT*depth,self.name))
588 print("{}{}".format(PRINT_INDENT*(depth+1),":".join(map(lambda b: format(b, "02x"), self.val))))
589
590 def __eq__(self,b):
591 return self.val==b
592
593class NBytes_List:
594 """Class representing a list of bytes string.
595
596 :param name: The name of the instance
597 :type name: str
598 :param bytes: Length of the bytes strings (2, 4 or 16)
599 :type bytes: int
600 :returns: NBytes_List instance.
601 :rtype: NBytes_List
602
603 """
604 def __init__(self,name,bytes=2):
605 #Bytes should be one of 2, 4 or 16
606 self.name=name
607 self.length=bytes
608 self.lonbytes = []
609
610 def decode(self,data):
611 while data:
612 mynbyte=NBytes("",self.length)
613 data=mynbyte.decode(data)
614 self.lonbytes.append(mynbyte)
615 return data
616
617 def show(self,depth=0):
618 print("{}{}:".format(PRINT_INDENT*depth,self.name))
619 for x in self.lonbytes:
620 x.show(depth+1)
621
622 def __len__(self):
623 return len(self.lonbytes)+self.length
624
625 def __contains__(self,b):
626 for x in self.lonbytes:
627 if b == x:
628 return True
629
630 return False
631
632class Float88:
633 """Class representing a 8.8 fixed point quantity.
634
635 :param name: The name of the instance
636 :type name: str
637 :returns: Float88 instance.
638 :rtype: Float88
639
640 """
641 def __init__(self,name):
642 self.name=name
643 self.val=0.0
644
645 def encode (self):
646 val=pack(">h",int(self.val*256))
647 return val
648
649 def decode(self,data):
650 self.val= unpack(">h",data)[0]/256.0
651 return data[2:]
652 def __len__(self):
653 return 2
654
655 def show(self,depth=0):
656 print("{}{}:".format(PRINT_INDENT*depth,self.name))
657 print("{}{}".format(PRINT_INDENT*(depth+1),self.val))
658
659
660
661
662class EmptyPayload:
663 def __init__(self):
664 pass
665
666 def encode(self):
667 return b""
668
669 def decode(self,data):
670 return data
671
672 def __len__(self):
673 return 0
674
675 def show(self,depth=0):
676 return
677
678#
679# Bluetooth starts here
680#
681
682class Packet:
683 """Class representing a generic HCI packet.
684
685 :param header: The packet header.
686 :type header: bytes
687 :returns: Packet instance.
688 :rtype: Packet
689
690 """
691 """A generic packet that will be build fromparts"""
692 def __init__(self, header="\x00", fmt=">B"):
693 self.header = header
694 self.fmt = fmt
695 self.payload=[]
696 self.raw_data=None
697
698 def encode (self) :
699 return pack(self.fmt, self.header)
700
701 def decode (self, data):
702 try:
703 if unpack(self.fmt,data[:calcsize(self.fmt)])[0] == self.header:
704 self.raw_data=data
705 return data[calcsize(self.fmt):]
706 except:
707 pass
708 return None
709
710 def retrieve(self,aclass):
711 """Look for a specifc class/name in the packet"""
712 resu=[]
713 for x in self.payload:
714 try:
715 if isinstance(aclass,str):
716 if x.name == aclass:
717 resu.append(x)
718 else:
719 if isinstance(x,aclass):
720 resu.append(x)
721
722 resu+=x.retrieve(aclass)
723 except:
724 pass
725 return resu
726#
727# Commands
728#
729
730class HCI_Command(Packet):
731 """Class representing a command HCI packet.
732
733 :param ogf: the Op-code Group (6 bits).
734 :type ogf: bytes
735 :param ocf: the Op-code Command (10 bits).
736 :type ocf: bytes
737 :returns: HCI_Command instance.
738 :rtype: HCI_Command
739
740 """
741
742 def __init__(self,ogf,ocf):
743 super().__init__(HCI_COMMAND)
744 self.cmd = OgfOcf("command",ogf,ocf)
745 self.payload = []
746
747 def encode(self):
748 pld=b""
749 for x in self.payload:
750 pld+=x.encode()
751 plen=len(pld)
752 pld=b"".join([super().encode(),self.cmd.encode(),pack(">B",plen),pld])
753 return pld
754
755 def show(self,depth=0):
756 self.cmd.show(depth)
757 for x in self.payload:
758 x.show(depth+1)
759
760class HCI_Cmd_LE_Scan_Enable(HCI_Command):
761 """Class representing a command HCI command to enable/disable BLE scanning.
762
763 :param enable: enable/disable scanning.
764 :type enable: bool
765 :param filter_dups: filter duplicates.
766 :type filter_dups: bool
767 :returns: HCI_Cmd_LE_Scan_Enable instance.
768 :rtype: HCI_Cmd_LE_Scan_Enable
769
770 """
771
772 def __init__(self,enable=True,filter_dups=True):
773 super(self.__class__, self).__init__(b"\x08",b"\x0c")
774 self.payload.append(Bool("enable",enable))
775 self.payload.append(Bool("filter",filter_dups))
776
777class HCI_Cmd_LE_Set_Scan_Params(HCI_Command):
778 """Class representing an HCI command to set the scanning parameters.
779
780 This will set a number of parameters related to the scanning functions. For the
781 interval and window, it will always silently enforce the Specs that says it should be >= 2.5 ms
782 and <= 10.24s. It will also silently enforce window <= interval
783
784 :param scan_type: Type of scanning. 0 => Passive (default)
785 1 => Active
786 :type scan_type: int
787 :param interval: Time in ms between the start of a scan and the next scan start. Default 10
788 :type interval: int/float
789 :param window: maximum advertising interval in ms. Default 10
790 :type window: int.float
791 :param oaddr_type: Type of own address Value 0 => public (default)
792 1 => Random
793 2 => Private with public fallback
794 3 => Private with random fallback
795 :type oaddr_type: int
796 :param filter: How white list filter is applied. 0 => No filter (Default)
797 1 => sender must be in white list
798 2 => Similar to 0. Some directed advertising may be received.
799 3 => Similar to 1. Some directed advertising may be received.
800 :type filter: int
801 :returns: HCI_Cmd_LE_Scan_Params instance.
802 :rtype: HCI_Cmd_LE_Scan_Params
803
804 """
805
806 def __init__(self,scan_type=0x0,interval=10, window=750, oaddr_type=0,filter=0):
807
808 super(self.__class__, self).__init__(b"\x08",b"\x0b")
809 self.payload.append(EnumByte("scan type",scan_type,
810 {0: "Passive",
811 1: "Active"}))
812 self.payload.append(UShortInt("Interval",int(round(min(10240,max(2.5,interval))/0.625)),endian="little"))
813 self.payload.append(UShortInt("Window",int(round(min(10240,max(2.5,min(interval,window)))/0.625)),endian="little"))
814 self.payload.append(EnumByte("own addresss type",oaddr_type,
815 {0: "Public",
816 1: "Random",
817 2: "Private IRK or Public",
818 3: "Private IRK or Random"}))
819 self.payload.append(EnumByte("filter policy",filter,
820 {0: "None",
821 1: "Sender In White List",
822 2: "Almost None",
823 3: "SIWL and some"}))
824
825
826class HCI_Cmd_LE_Advertise(HCI_Command):
827 """Class representing a command HCI command to enable/disable BLE advertising.
828
829 :param enable: enable/disable advertising.
830 :type enable: bool
831 :returns: HCI_Cmd_LE_Scan_Enable instance.
832 :rtype: HCI_Cmd_LE_Scan_Enable
833
834 """
835
836 def __init__(self,enable=True):
837 super(self.__class__, self).__init__(b"\x08",b"\x0a")
838 self.payload.append(Bool("enable",enable))
839
840class HCI_Cmd_LE_Set_Advertised_Msg(HCI_Command):
841 """Class representing an HCI command to set the advertised content.
842
843 :param enable: enable/disable advertising.
844 :type enable: bool
845 :returns: HCI_Cmd_LE_Scan_Enable instance.
846 :rtype: HCI_Cmd_LE_Scan_Enable
847
848 """
849
850 def __init__(self,msg=EmptyPayload()):
851 super(self.__class__, self).__init__(b"\x08",b"\x08")
852 self.payload.append(msg)
853
854class HCI_Cmd_LE_Set_Advertised_Params(HCI_Command):
855 """Class representing an HCI command to set the advertised parameters.
856
857 This will set a number of parameters relted to the advertising functions. For the
858 min and max intervals, it will always silently enforce the Specs that says it should be >= 20ms
859 and <= 10.24s. It will also silently enforce interval_max >= interval_min
860
861 :param interval_min: minimum advertising interval in ms. Default 500
862 :type interval_min: int/float
863 :param interval_max: maximum advertising interval in ms. Default 750
864 :type interval_max: int/float
865 :param adv_type: Type of advertising. Value 0 +> Connectable, Scannable advertising
866 1 => Connectable directed advertising (High duty cycle)
867 2 => Scannable Undirected advertising
868 3 => Non connectable undirected advertising (default)
869 :type adv_type: int
870 :param oaddr_type: Type of own address Value 0 => public (default)
871 1 => Random
872 2 => Private with public fallback
873 3 => Private with random fallback
874 :type oaddr_type: int
875 :param paddr_type: Type of peer address Value 0 => public (default)
876 1 => Random
877 :type paddr_type: int
878 :param peer_addr: Peer MAC address Default 00:00:00:00:00:00
879 :type peer_addr: str
880 :param cmap: Channel map. A bit field dfined as "Channel 37","Channel 38","Channel 39","RFU","RFU","RFU","RFU","RFU"
881 Default value is 0x7. The value 0x0 is RFU.
882 :type cmap: int
883 :param filter: How white list filter is applied. 0 => No filter (Default)
884 1 => scan are filtered
885 2 => Connection are filtered
886 3 => scan and connection are filtered
887 :type filter: int
888 :returns: HCI_Cmd_LE_Scan_Params instance.
889 :rtype: HCI_Cmd_LE_Scan_Params
890
891 """
892
893 def __init__(self,interval_min=500, interval_max=750,
894 adv_type=0x3, oaddr_type=0, paddr_type=0,
895 peer_addr="00:00:00:00:00:00", cmap=0x7, filter=0):
896
897 super(self.__class__, self).__init__(b"\x08",b"\x06")
898 self.payload.append(UShortInt("Adv minimum",int(round(min(10240,max(20,interval_min))/0.625)),endian="little"))
899 self.payload.append(UShortInt("Adv maximum",int(round(min(10240,max(20,max(interval_min,interval_max)))/0.625)),endian="little"))
900 self.payload.append(EnumByte("adv type",adv_type,
901 {0: "ADV_IND",
902 1: "ADV_DIRECT_IND high",
903 2: "ADV_SCAN_IND",
904 3: "ADV_NONCONN_IND",
905 4: "ADV_DIRECT_IND low"}))
906 self.payload.append(EnumByte("own addresss type",paddr_type,
907 {0: "Public",
908 1: "Random",
909 2: "Private IRK or Public",
910 3: "Private IRK or Random"}))
911 self.payload.append(EnumByte("peer addresss type",oaddr_type,
912 {0: "Public",
913 1: "Random"}))
914 self.payload.append(MACAddr("peer",mac=peer_addr))
915 self.payload.append(BitFieldByte("Channels",cmap,["Channel 37","Channel 38","Channel 39","RFU","RFU","RFU","RFU", "RFU"]))
916
917 self.payload.append(EnumByte("filter policy",filter,
918 {0: "None",
919 1: "Scan",
920 2: "Connection",
921 3: "Scan and Connection"}))
922
923class HCI_Cmd_Reset(HCI_Command):
924 """Class representing an HCI command to reset the adapater.
925
926
927 :returns: HCI_Cmd_Reset instance.
928 :rtype: HCI_Cmd_Reset
929
930 """
931
932 def __init__(self):
933 super(self.__class__, self).__init__(b"\x03",b"\x03")
934
935
936####
937# HCI EVents
938####
939
940class HCI_Event(Packet):
941
942 def __init__(self,code=0,payload=[]):
943 super().__init__(HCI_EVENT)
944 self.payload.append(Byte("code"))
945 self.payload.append(UIntByte("length"))
946
947 def decode(self,data):
948 data=super().decode(data)
949 if data is None:
950 return None
951
952 for x in self.payload:
953 x.decode(data[:len(x)])
954 data=data[len(x):]
955 code=self.payload[0]
956 length=self.payload[1].val
957 if code.val==b"\x0e":
958 ev = HCI_CC_Event()
959 data=ev.decode(data)
960 self.payload.append(ev)
961 elif code.val==b"\x3e":
962 ev = HCI_LE_Meta_Event()
963 data=ev.decode(data)
964 self.payload.append(ev)
965 else:
966 ev=Itself("Payload")
967 data=ev.decode(data)
968 self.payload.append(ev)
969 return data
970
971 def show(self,depth=0):
972 print("{}HCI Event:".format(PRINT_INDENT*depth))
973 for x in self.payload:
974 x.show(depth+1)
975
976
977class HCI_CC_Event(Packet):
978 """Command Complete event"""
979 def __init__(self):
980 self.name="Command Completed"
981 self.payload=[UIntByte("allow pkt"),OgfOcf("cmd"),Itself("resp code")]
982
983
984 def decode(self,data):
985 for x in self.payload:
986 data=x.decode(data)
987 return data
988
989 def show(self,depth=0):
990 for x in self.payload:
991 x.show(depth+1)
992
993class HCI_LE_Meta_Event(Packet):
994 def __init__(self):
995 self.name="LE Meta"
996 self.payload=[Byte("code")]
997
998 def decode(self,data):
999 for x in self.payload:
1000 data=x.decode(data)
1001 code=self.payload[0]
1002 if code.val==b"\x02":
1003 ev=HCI_LEM_Adv_Report()
1004 data=ev.decode(data)
1005 self.payload.append(ev)
1006 else:
1007 ev=Itself("Payload")
1008 data=ev.decode(data)
1009 self.payload.append(ev)
1010 return data
1011
1012 def show(self,depth=0):
1013 print("{}{}:".format(PRINT_INDENT*depth,self.name))
1014 for x in self.payload:
1015 x.show(depth+1)
1016
1017
1018class HCI_LEM_Adv_Report(Packet):
1019 def __init__(self):
1020 self.name="Adv Report"
1021 self.payload=[UIntByte("num reports"),
1022 EnumByte("ev type",0,{0:"generic adv", 3:"no connection adv", 4:"scan rsp"}),
1023 EnumByte("addr type",0,{0:"public", 1:"random"}),
1024 MACAddr("peer"),UIntByte("length")]
1025
1026
1027 def decode(self,data):
1028
1029 for x in self.payload:
1030 data=x.decode(data)
1031 #Now we have a sequence of len, type data with possibly a RSSI byte at the end
1032 while len(data) > 1:
1033 length=UIntByte("sublen")
1034 data=length.decode(data)
1035 code=EIR_Hdr()
1036 data=code.decode(data)
1037
1038 if code.val == 0x01:
1039 #Flag
1040 myinfo=BitFieldByte("flags",0,["Undef","Undef","Simul LE - BR/EDR (Host)","Simul LE - BR/EDR (Control.)","BR/EDR Not Supported",
1041 "LE General Disc.","LE Limited Disc."])
1042 xx=myinfo.decode(data[:length.val-len(code)])
1043 self.payload.append(myinfo)
1044 elif code.val == 0x02:
1045 myinfo=NBytes_List("Incomplete uuids",2)
1046 xx=myinfo.decode(data[:length.val-len(code)])
1047 self.payload.append(myinfo)
1048 elif code.val == 0x03:
1049 myinfo=NBytes_List("Complete uuids",2)
1050 xx=myinfo.decode(data[:length.val-len(code)])
1051 self.payload.append(myinfo)
1052 elif code.val == 0x04:
1053 myinfo=NBytes_List("Incomplete uuids",4)
1054 xx=myinfo.decode(data[:length.val-len(code)])
1055 self.payload.append(myinfo)
1056 elif code.val == 0x05:
1057 myinfo=NBytes_List("Complete uuids",4)
1058 xx=myinfo.decode(data[:length.val-len(code)])
1059 self.payload.append(myinfo)
1060 elif code.val == 0x06:
1061 myinfo=NBytes_List("Incomplete uuids",16)
1062 xx=myinfo.decode(data[:length.val-len(code)])
1063 self.payload.append(myinfo)
1064 elif code.val == 0x07:
1065 myinfo=NBytes_List("Complete uuids",16)
1066 xx=myinfo.decode(data[:length.val-len(code)])
1067 self.payload.append(myinfo)
1068 elif code.val == 0x14:
1069 myinfo=NBytes_List("Service Solicitation uuid",2)
1070 xx=myinfo.decode(data[:length.val-len(code)])
1071 self.payload.append(myinfo)
1072 elif code.val == 0x16:
1073 myinfo=Adv_Data("Advertised Data",2)
1074 xx=myinfo.decode(data[:length.val-len(code)])
1075 self.payload.append(myinfo)
1076 elif code.val == 0x1f:
1077 myinfo=NBytes_List("Service Solicitation uuid",4)
1078 xx=myinfo.decode(data[:length.val-len(code)])
1079 self.payload.append(myinfo)
1080 elif code.val == 0x20:
1081 myinfo=Adv_Data("Advertised Data",4)
1082 xx=myinfo.decode(data[:length.val-len(code)])
1083 self.payload.append(myinfo)
1084 elif code.val == 0x15:
1085 myinfo=NBytes_List("Service Solicitation uuid",16)
1086 xx=myinfo.decode(data[:length.val-len(code)])
1087 self.payload.append(myinfo)
1088 elif code.val == 0x21:
1089 myinfo=Adv_Data("Advertised Data",16)
1090 xx=myinfo.decode(data[:length.val-len(code)])
1091 self.payload.append(myinfo)
1092 elif code.val == 0x08:
1093 myinfo=String("Short Name")
1094 xx=myinfo.decode(data[:length.val-len(code)])
1095 self.payload.append(myinfo)
1096 elif code.val == 0x09:
1097 myinfo=String("Complete Name")
1098 xx=myinfo.decode(data[:length.val-len(code)])
1099 self.payload.append(myinfo)
1100 else:
1101 myinfo=Itself("Payload for %s"%code.strval)
1102 xx=myinfo.decode(data[:length.val-len(code)])
1103 self.payload.append(myinfo)
1104
1105 data=data[length.val-len(code):]
1106 if data:
1107 myinfo=IntByte("rssi")
1108 data=myinfo.decode(data)
1109 self.payload.append(myinfo)
1110 return data
1111
1112 def show(self,depth=0):
1113 print("{}{}:".format(PRINT_INDENT*depth,self.name))
1114 for x in self.payload:
1115 x.show(depth+1)
1116
1117class EIR_Hdr(Packet):
1118 def __init__(self):
1119 self.type= EnumByte("type", 0, {
1120 0x01: "flags",
1121 0x02: "incomplete_list_16_bit_svc_uuids",
1122 0x03: "complete_list_16_bit_svc_uuids",
1123 0x04: "incomplete_list_32_bit_svc_uuids",
1124 0x05: "complete_list_32_bit_svc_uuids",
1125 0x06: "incomplete_list_128_bit_svc_uuids",
1126 0x07: "complete_list_128_bit_svc_uuids",
1127 0x08: "shortened_local_name",
1128 0x09: "complete_local_name",
1129 0x0a: "tx_power_level",
1130 0x0d: "class_of_device",
1131 0x0e: "simple_pairing_hash",
1132 0x0f: "simple_pairing_rand",
1133 0x10: "sec_mgr_tk",
1134 0x11: "sec_mgr_oob_flags",
1135 0x12: "slave_conn_intvl_range",
1136 0x17: "pub_target_addr",
1137 0x18: "rand_target_addr",
1138 0x19: "appearance",
1139 0x1a: "adv_intvl",
1140 0x1b: "le_addr",
1141 0x1c: "le_role",
1142 0x14: "list_16_bit_svc_sollication_uuids",
1143 0x1f: "list_32_bit_svc_sollication_uuids",
1144 0x15: "list_128_bit_svc_sollication_uuids",
1145 0x16: "svc_data_16_bit_uuid",
1146 0x20: "svc_data_32_bit_uuid",
1147 0x21: "svc_data_128_bit_uuid",
1148 0x22: "sec_conn_confirm",
1149 0x23: "sec_conn_rand",
1150 0x24: "uri",
1151 0xff: "mfg_specific_data",
1152 })
1153
1154 def decode(self,data):
1155 return self.type.decode(data)
1156
1157 def show(self):
1158 return self.type.show()
1159
1160 @property
1161 def val(self):
1162 return self.type.val
1163
1164 @property
1165 def strval(self):
1166 return self.type.strval
1167
1168 def __len__(self):
1169 return len(self.type)
1170
1171class Adv_Data(Packet):
1172 def __init__(self,name,length):
1173 self.name=name
1174 self.length=length
1175 self.payload=[]
1176
1177 def decode(self,data):
1178 myinfo=NBytes("Service Data uuid",self.length)
1179 data=myinfo.decode(data)
1180 self.payload.append(myinfo)
1181 if data:
1182 myinfo=Itself("Adv Payload")
1183 data=myinfo.decode(data)
1184 self.payload.append(myinfo)
1185 return data
1186
1187 def show(self,depth=0):
1188 print("{}{}:".format(PRINT_INDENT*depth,self.name))
1189 for x in self.payload:
1190 x.show(depth+1)
1191
1192 def __len__(self):
1193 resu=0
1194 for x in self.payload:
1195 resu+=len(x)
1196 return resu
1197
1198
1199
1200#
1201# The defs are over. Now the realstuffs
1202#
1203
1204def create_bt_socket(interface=0):
1205 exceptions = []
1206 sock = None
1207 try:
1208 sock = socket.socket(family=socket.AF_BLUETOOTH,
1209 type=socket.SOCK_RAW,
1210 proto=socket.BTPROTO_HCI)
1211 sock.setblocking(False)
1212 sock.setsockopt(socket.SOL_HCI, socket.HCI_FILTER, pack("IIIh2x", 0xffffffff,0xffffffff,0xffffffff,0)) #type mask, event mask, event mask, opcode
1213 try:
1214 sock.bind((interface,))
1215 except OSError as exc:
1216 exc = OSError(
1217 exc.errno, 'error while attempting to bind on '
1218 'interface {!r}: {}'.format(
1219 interface, exc.strerror))
1220 exceptions.append(exc)
1221 except OSError as exc:
1222 if sock is not None:
1223 sock.close()
1224 exceptions.append(exc)
1225 except:
1226 if sock is not None:
1227 sock.close()
1228 raise
1229 if len(exceptions) == 1:
1230 raise exceptions[0]
1231 elif len(exceptions) > 1:
1232 model = str(exceptions[0])
1233 if all(str(exc) == model for exc in exceptions):
1234 raise exceptions[0]
1235 raise OSError('Multiple exceptions: {}'.format(
1236 ', '.join(str(exc) for exc in exceptions)))
1237 return sock
1238
1239###########
1240
1241class BLEScanRequester(asyncio.Protocol):
1242 '''Protocol handling the requests'''
1243 def __init__(self):
1244 self.transport = None
1245 self.smac = None
1246 self.sip = None
1247 self.process = self.default_process
1248
1249 def connection_made(self, transport):
1250 self.transport = transport
1251 command=HCI_Cmd_LE_Set_Scan_Params()
1252 self.transport.write(command.encode())
1253
1254 def connection_lost(self, exc):
1255 super().connection_lost(exc)
1256
1257 def send_scan_request(self):
1258 '''Sending LE scan request'''
1259 command=HCI_Cmd_LE_Scan_Enable(True,False)
1260 self.transport.write(command.encode())
1261
1262 def stop_scan_request(self):
1263 '''Sending LE scan request'''
1264 command=HCI_Cmd_LE_Scan_Enable(False,False)
1265 self.transport.write(command.encode())
1266
1267 def send_command(self,command):
1268 '''Sending an arbitrary command'''
1269 self.transport.write(command.encode())
1270
1271 def data_received(self, packet):
1272 self.process(packet)
1273
1274 def default_process(self,data):
1275 pass
diff --git a/checkbox_support/vendor/aioblescan/eddystone.py b/checkbox_support/vendor/aioblescan/eddystone.py
0new file mode 1006441276new file mode 100644
index 0000000..3cc0d2c
--- /dev/null
+++ b/checkbox_support/vendor/aioblescan/eddystone.py
@@ -0,0 +1,362 @@
1#!/usr/bin/env python3
2# -*- coding:utf-8 -*-
3#
4# This file deal with EddyStone formated message
5#
6# Copyright (c) 2017 François Wautier
7#
8# Note part of this code was adapted from PyBeacon (https://github.com/nirmankarta/PyBeacon)
9#
10# Permission is hereby granted, free of charge, to any person obtaining a copy
11# of this software and associated documentation files (the "Software"), to deal
12# in the Software without restriction, including without limitation the rights
13# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
14# of the Software, and to permit persons to whom the Software is furnished to do so,
15# subject to the following conditions:
16#
17# The above copyright notice and this permission notice shall be included in all copies
18# or substantial portions of the Software.
19#
20# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
24# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
25# IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE
26
27import checkbox_support.vendor.aioblescan as aios
28from urllib.parse import urlparse
29from enum import Enum
30
31#
32EDDY_UUID=b"\xfe\xaa" #Google UUID
33
34class ESType(Enum):
35 """Enumerator for Eddystone types."""
36 uid = 0x00
37 url = 0x10
38 tlm = 0x20
39 eid = 0x30
40
41url_schemes = [
42 ("http",True),
43 ("https",True),
44 ("http",False),
45 ("https",False),
46 ]
47
48url_domain = [
49 "com","org","edu","net","info","biz","gov"]
50
51class EddyStone(object):
52 """Class defining the content of an EddyStone advertisement.
53
54 Here the param type will depend on the type.
55
56 For URL it should be a string with a compatible URL.
57
58 For UID it is a dictionary with 2 keys, "namespace" and "instance", values are bytes .
59
60 For TLM it shall be an dictionary with 4 keys: "battery","temperature", "count" and "uptime".
61 Any missing key shall be replaced by its default value.
62
63 For EID it should me a bytes string of length 8
64
65 :param type: The type of EddyStone advertisement. From ESType
66 :type type: ESType
67 :oaram param: The payload corresponding to the type
68
69 """
70
71 def __init__(self, type=ESType.url, param="https://goo.gl/m9UiEA"):
72 self.power = 0
73 self.payload = [] #As defined in https://github.com/google/eddystone/blob/master/protocol-specification.md
74 self.payload.append(aios.Byte("Flag Length",b'\x02'))
75 self.payload.append(aios.Byte("Flag Data Type",b'\x01'))
76 self.payload.append(aios.Byte("Flag Data",b'\x1a'))
77 self.payload.append(aios.Byte("Length UUID services",b'\x03'))
78 self.payload.append(aios.Byte("Complete List UUID Service",b'\x03'))
79 self.payload.append(aios.Byte("Eddystone UUID",b'\xaa'))
80 self.payload.append(aios.Byte("...",b'\xfe'))
81 self.service_data_length = aios.IntByte("Service Data length",4)
82 self.payload.append(self.service_data_length)
83 self.payload.append(aios.Byte("Service Data data type value",b'\x16'))
84 self.payload.append(aios.Byte("Eddystone UUID",b'\xaa'))
85 self.payload.append(aios.Byte("...",b'\xfe'))
86 self.type = aios.EnumByte("type",type.value,{ESType.uid.value:"Eddystone-UID",
87 ESType.url.value:"Eddystone-URL",
88 ESType.tlm.value:"Eddystone-TLM",
89 ESType.eid.value:"Eddystone-EID"})
90 self.payload.append(self.type)
91 self.parsed_payload = b''
92 self.type_payload=param
93
94 def change_type(self, type, param):
95 self.type.val=type.value
96 self.type_payload=param
97 self.service_data_length.val=4
98 self.parsed_payload = b''
99
100 def change_type_payload(self, param):
101 self.type_payload=param
102 self.service_data_length.val=4
103 self.parsed_payload = b''
104
105 def url_encoder(self):
106 encodedurl = []
107 encodedurl.append(aios.IntByte("Tx Power",self.power))
108 asisurl=""
109 myurl = urlparse(self.type_payload)
110 myhostname = myurl.hostname
111 mypath = myurl.path
112 if (myurl.scheme,myhostname.startswith("www.")) in url_schemes:
113 encodedurl.append(aios.IntByte("URL Scheme",
114 url_schemes.index((myurl.scheme,myhostname.startswith("www.")))))
115 if myhostname.startswith("www."):
116 myhostname = myhostname[4:]
117
118 extval=None
119 if myhostname.split(".")[-1] in url_domain:
120 extval = url_domain.index(myhostname.split(".")[-1])
121 myhostname = ".".join(myhostname.split(".")[:-1])
122 if extval and not mypath.startswith("/"):
123 extval+=7
124 else:
125 if myurl.port is None:
126 if extval:
127 mypath = mypath[1:]
128 else:
129 extval += 7
130 encodedurl.append(aios.String("URL string"))
131 encodedurl[-1].val = myhostname
132 if extval:
133 encodedurl.append(aios.IntByte("URL Extention",extval))
134
135 if myurl.port:
136 asisurl += ":"+str(myurl.port)+mypath
137 asisurl += mypath
138 if myurl.params:
139 asisurl += ";"+myurl.params
140 if myurl.query:
141 asisurl += "?"+myurl.query
142 if myurl.fragment:
143 asisurl += "#"+myurl.fragment
144 encodedurl.append(aios.String("Rest of URL"))
145 encodedurl[-1].val = asisurl
146 tlength=0
147 for x in encodedurl: #Check the payload length
148 tlength += len(x)
149 if tlength > 19: #Actually 18 but we have tx power
150 raise Exception("Encoded url too long (max 18 bytes)")
151 self.service_data_length.val += tlength #Update the payload length
152 return encodedurl
153
154
155 def uid_encoder(self):
156 encodedurl = []
157 encodedurl.append(aios.IntByte("Tx Power",self.power))
158 encodedurl.append(aios.NBytes("Namespace",10))
159 encodedurl[-1].val = self.type_payload["namespace"]
160 encodedurl.append(aios.NBytes("Instance",6))
161 encodedurl[-1].val = self.type_payload["instance"]
162 encodedurl.append(aios.NBytes("RFU",2))
163 encodedurl[-1].val = b'\x00\x00'
164 self.service_data_length.val = 23 #Update the payload length/ways the same for uid
165 return encodedurl
166
167 def tlm_encoder(self):
168 encodedurl = []
169 encodedurl.append(aios.NBytes("VBATT",2))
170 if "battery" in self.type_payload:
171 encodedurl[-1].val = self.type_payload["battery"]
172 else:
173 encodedurl[-1].val = -128
174 encodedurl.append(aios.Float88("Temperature"))
175 if "temperature" in self.type_payload:
176 encodedurl[-1].val = self.type_payload["temperature"]
177 else:
178 encodedurl[-1].val = -128
179
180 encodedurl.append(aios.ULongInt("Count"))
181 if "count" in self.type_payload:
182 encodedurl[-1].val = self.type_payload["count"]
183 else:
184 encodedurl[-1].val = 0
185
186 encodedurl.append(aios.ULongInt("Uptime"))
187 if "uptime" in self.type_payload:
188 encodedurl[-1].val = self.type_payload["uptime"]
189 else:
190 encodedurl[-1].val = 0
191 return encodedurl
192
193
194 def eid_encoder(self):
195 encodedurl = []
196 encodedurl.append(aios.IntByte("Tx Power",self.power))
197 encodedurl.append(aios.NBytes("Namespace",8))
198 encodedurl[-1].val = self.type_payload
199 self.service_data_length.val = 13
200 return encodedurl
201
202
203 def encode(self):
204 #Generate the payload
205 if self.type.val == ESType.uid.value:
206 espayload = self.uid_encoder()
207 elif self.type.val == ESType.url.value:
208 espayload = self.url_encoder()
209 elif self.type.val == ESType.tlm.value:
210 espayload = self.tlm_encoder()
211 elif self.type.val == ESType.eid.value:
212 espayload = self.eid_encoder()
213 encmsg=b''
214 for x in self.payload+espayload:
215 encmsg += x.encode()
216 mylen=aios.IntByte("Length",len(encmsg))
217 encmsg = mylen.encode()+encmsg
218 for x in range(32-len(encmsg)):
219 encmsg+=b'\x00'
220 return encmsg
221
222 def decode(self, packet):
223 """Check a parsed packet and figure out if it is an Eddystone Beacon.
224 If it is , return the relevant data as a dictionary.
225
226 Return None, it is not an Eddystone Beacon advertising packet"""
227
228 ssu=packet.retrieve("Complete uuids")
229 found=False
230 for x in ssu:
231 if EDDY_UUID in x:
232 found=True
233 break
234 if not found:
235 return None
236
237 found=False
238 adv=packet.retrieve("Advertised Data")
239 for x in adv:
240 luuid=x.retrieve("Service Data uuid")
241 for uuid in luuid:
242 if EDDY_UUID == uuid:
243 found=x
244 break
245 if found:
246 break
247
248
249 if not found:
250 return None
251
252 try:
253 top=found.retrieve("Adv Payload")[0]
254 except:
255 return None
256 #Rebuild that part of the structure
257 found.payload.remove(top)
258 #Now decode
259 result={}
260 data=top.val
261 etype = aios.EnumByte("type",self.type.val,{ESType.uid.value:"Eddystone-UID",
262 ESType.url.value:"Eddystone-URL",
263 ESType.tlm.value:"Eddystone-TLM",
264 ESType.eid.value:"Eddystone-EID"})
265 data=etype.decode(data)
266 found.payload.append(etype)
267 if etype.val== ESType.uid.value:
268 power=aios.IntByte("tx_power")
269 data=power.decode(data)
270 found.payload.append(power)
271 result["tx_power"]=power.val
272
273 nspace=aios.Itself("namespace")
274 xx=nspace.decode(data[:10]) #According to https://github.com/google/eddystone/tree/master/eddystone-uid
275 data=data[10:]
276 found.payload.append(nspace)
277 result["name space"]=nspace.val
278
279 nspace=aios.Itself("instance")
280 xx=nspace.decode(data[:6]) #According to https://github.com/google/eddystone/tree/master/eddystone-uid
281 data=data[6:]
282 found.payload.append(nspace)
283 result["instance"]=nspace.val
284
285 elif etype.val== ESType.url.value:
286 power=aios.IntByte("tx_power")
287 data=power.decode(data)
288 found.payload.append(power)
289 result["tx_power"]=power.val
290
291 url=aios.EnumByte("type",0,{0x00:"http://www.",0x01:"https://www.",0x02:"http://",0x03:"https://"})
292 data=url.decode(data)
293 result["url"]=url.strval
294 for x in data:
295 if bytes([x]) == b"\x00":
296 result["url"]+=".com/"
297 elif bytes([x]) == b"\x01":
298 result["url"]+=".org/"
299 elif bytes([x]) == b"\x02":
300 result["url"]+=".edu/"
301 elif bytes([x]) == b"\x03":
302 result["url"]+=".net/"
303 elif bytes([x]) == b"\x04":
304 result["url"]+=".info/"
305 elif bytes([x]) == b"\x05":
306 result["url"]+=".biz/"
307 elif bytes([x]) == b"\x06":
308 result["url"]+=".gov/"
309 elif bytes([x]) == b"\x07":
310 result["url"]+=".com"
311 elif bytes([x]) == b"\x08":
312 result["url"]+=".org"
313 elif bytes([x]) == b"\x09":
314 result["url"]+=".edu"
315 elif bytes([x]) == b"\x10":
316 result["url"]+=".net"
317 elif bytes([x]) == b"\x11":
318 result["url"]+=".info"
319 elif bytes([x]) == b"\x12":
320 result["url"]+=".biz"
321 elif bytes([x]) == b"\x13":
322 result["url"]+=".gov"
323 else:
324 result["url"]+=chr(x) #x.decode("ascii") #Yep ASCII only
325 url=aios.String("url")
326 url.decode(result["url"])
327 found.payload.append(url)
328 elif etype.val== ESType.tlm.value:
329 myinfo=aios.IntByte("version")
330 data=myinfo.decode(data)
331 found.payload.append(myinfo)
332 myinfo=aios.ShortInt("battery")
333 data=myinfo.decode(data)
334 result["battery"]=myinfo.val
335 found.payload.append(myinfo)
336 myinfo=aios.Float88("temperature")
337 data=myinfo.decode(data)
338 found.payload.append(myinfo)
339 result["temperature"]=myinfo.val
340 myinfo=aios.LongInt("pdu count")
341 data=myinfo.decode(data)
342 found.payload.append(myinfo)
343 result["pdu count"]=myinfo.val
344 myinfo=aios.LongInt("uptime")
345 data=myinfo.decode(data)
346 found.payload.append(myinfo)
347 result["uptime"]=myinfo.val*100 #in msecs
348 return result
349 #elif etype.val== ESType.tlm.eid:
350 else:
351 result["data"]=data
352 xx=Itself("data")
353 xx.decode(data)
354 found.payload.append(xx)
355
356 rssi=packet.retrieve("rssi")
357 if rssi:
358 result["rssi"]=rssi[-1].val
359 mac=packet.retrieve("peer")
360 if mac:
361 result["mac address"]=mac[-1].val
362 return result
diff --git a/setup.py b/setup.py
index c2517fd..153e8e0 100755
--- a/setup.py
+++ b/setup.py
@@ -90,6 +90,8 @@ setup(
90 "checkbox_support.scripts.nmea_test:main"),90 "checkbox_support.scripts.nmea_test:main"),
91 ("checkbox-support-snap_connect="91 ("checkbox-support-snap_connect="
92 "checkbox_support.scripts.snap_connect:main"),92 "checkbox_support.scripts.snap_connect:main"),
93 ("checkbox-support-eddystone_scanner="
94 "checkbox_support.scripts.eddystone_scanner:main"),
93 ],95 ],
94 },96 },
95)97)

Subscribers

People subscribed via source and target branches