見出し画像

pythonによるBLEマルチペアリング

こんにちは.のぶです.

今回はBLE(Bluetooth Low Energy)のマルチペアリングをpythonでやる話をします.

BLEって?

最近のBluetoothのことです(雑).

Bluetooth 4.0以上の規格をBLEと呼びます.以前のBluetoothが大容量データ送信に長けてるのに対し,BLEは小容量データ送信・省電力の特徴があります.だから「Bluetooth Low Energy」

マルチペアリングって?

1つの機器に複数個の機器を接続することです.無線のキーボードと無線のマウスをパソコンに同時に接続しながら使うイメージです.

ただ,この記事で対象にしているのは,例えば光センサーや温度センサー,複数のマイコンがあって,それらの情報を同時にBLEで親機に集め,なにかしらのIoTシステムを作りたい人などです.マウスなどのHID機器はよく知らんです.

なんでpython?

スマホアプリでマルチペアリングする記事は散見するものの,C++やpythonなどのマイコンに使われる言語では情報が乏しかったので書いてみました.

pythonはラズパイやパソコンで簡単に動かせますから,趣味でモノづくりする人にとってちょうど良いかと思います.

使用モジュール

・bleak
・asyncio
※python 3.7以上

BLE接続にはbleakというモジュールを使います.

理由はasyncioというモジュールを使っているからです.

では,asyncioというモジュールは何かというと,

非同期を処理を行うためのモジュールになります.

引用:http://ossforum.jp/node/753
非同期処理は、あるタスクが実行をしている際に、他のタスクが別の処理を実行できる方式である。 同期処理では、あるタスクが実行している間、他のタスクの処理は中断される方式である。

ある処理をやっている間に別の処理もやる,ということです.pythonは基本的に同期処理です.

結論から言うと,マルチペアリングを行うためには,非同期処理にしないといけません.そこでasyncioというモジュールを使っているんですね.

全体のコード

import asyncio
from bleak import discover, BleakClient

device1 = "xxxxxxxx"
device2 = "xxxxxxxx"
...

class Device1Handler(BleakClient):
   """
       Args:
       address_or_device (`BLEDevice` or str): the Bleak device returned after scanning, or a bluetooth device address.
   """
   def __init__(self, address_or_device):
       self._client = BleakClient(address_or_device)
       sself.xxxx = xxx

   def _device1_cb(self, sender, data):
       """
           Write callback device1 function
           For example:
           self.xxx = struct.unpack("hhb",data)
       """
       if self._device1_cb is not None:
           self._device1_cb(self.data)
       else:
           print("device_cb is None")

   async def connect(self, timeout):
       """Connect to the thermometer device.
       Args:
           timeout (float or None): The timeout to connect to the device in seconds, or None to wait forever.
       Returns:
           Boolean representing the connection status.
       """
       return await self._client.connect(timeout=timeout)

   async def disconnect(self):
       """Disconnect from the thermometer device.
       Returns:
           Boolean representing if device is disconnected.
       """
       self._deivce1_cb = None
       return await self._client.disconnect()

   async def start(self, device1_cb):
       """Setup and/or read from the device.
       Returns right away if the device is not connected.
       """
       if not await self._client.is_connected():
           return
       """
       Write what do you want
       """
.............

async def paring_func(device):
   if device.name == device1:
       dev = Device1Handler(device)
       cb = device1_cb
   elif device.name == device2:
       dev = Device2Handler(device)
       cb = device2_cb
   
   ...
   
   else:
       print("Unknown device")
       return
   try:
       await dev.connect(None)
   except bleak.exc.BleakError:
       print("Connection Error")
       return
   await dev.start(cb)
   await asyncio.sleep(5)
   await dev.disconnect()

def device1_cb(xxx):
    """
    Write device1 callback
    """

.....

async def main():
   names: tuple = (device1, device2, ....)
   paring_tasks: list = []
   while True:
       print("Start scan")
       devices = await discover()
       for d in devices:
           if d.name.startswith(names):
               paring_tasks.append(asyncio.create_task(paring_func(device=d)))
       if paring_tasks:
           [await task for task in paring_tasks]
       paring_tasks = []

asyncio.run(main())


コードの説明

ざっくり説明すると,

各デバイスに対応するDeviceHandlerクラスを作成

class Device1Handler(BleakClient):

ペアリングする関数も定義しとく

async def paring_func(device):
   if device.name == device1:
       dev = Device1Handler(device)
       cb = device1_cb
   elif device.name == device2:
       dev = Device2Handler(device)
       cb = device2_cb
   
   ...
   
   else:
       print("Unknown device")
       return
   try:
       await dev.connect(None)
   except bleak.exc.BleakError:
       print("Connection Error")
       return
   await dev.start(cb)
   await asyncio.sleep(5)
   await dev.disconnect()


デバイスの名前,またはアドレスを予め用意し,スキャンを開始して,デバイスがあればペアリングタスクにペアリングする関数を追加していく

device1 = "xxxxxxxx"
device2 = "xxxxxxxx"
...

async def main():
   names: tuple = (device1, device2, ....)
   paring_tasks: list = []
   while True:
       print("Start scan")
       devices = await discover()
       for d in devices:
           if d.name.startswith(names):
               paring_tasks.append(asyncio.create_task(paring_func(device=d))

そして最後にペアリングタスクの実行をします

        if paring_tasks:
           [await task for task in paring_tasks]
       paring_tasks = []

ここでミソとなるのは,ペアリングをする関数をasyncio.create_taskメソッドでTaskオブジェクト化すること.そして,paring_tasksリストにそれを格納して最後に実行することです.

接続→データ送受信→切断の一連の流れを1つのタスクとして機器ごとに登録することで,機器ごとに独立した通信が行えます.

一つの機器との通信に時間がかかっていても,他の機器との通信が遅くなることはありません.

asyncioのTaskオブジェクトについて
https://docs.python.org/ja/3/library/asyncio-task.html


要は,マルチペアリングするには非同期で機器にコネクトすればいいのです.

上記のコードでは5秒間隔でスキャンし,ペアリングタスクを行っています.

各機器とのデータ通信が終わるまでスキャンを行いません.ペアリングも,callback関数が実行されると接続を解除します.

ペアリングを維持したまま,定期的にスキャンしたり,あるデバイスは切ってあるデバイスは接続を永久的に続けるなどしたい方はコードをアレンジしてみてください.

本記事が開発の助けになれば良いです.

終わり

この記事が気に入ったらサポートをしてみませんか?