Usage Examples#

Alert#

"""Turn the alert on Mi Band device
"""
from kivy.app import App
from kivy.uix.button import Button

from able import BluetoothDispatcher, GATT_SUCCESS
from error_message import install_exception_handler


class BLE(BluetoothDispatcher):
    device = alert_characteristic = None

    def start_alert(self, *args, **kwargs):
        if self.alert_characteristic:  # alert service is already discovered
            self.alert(self.alert_characteristic)
        elif self.device:  # device is already founded during the scan
            self.connect_gatt(self.device)  # reconnect
        else:
            self.stop_scan()  # stop previous scan
            self.start_scan()  # start a scan for devices

    def on_device(self, device, rssi, advertisement):
        # some device is found during the scan
        name = device.getName()
        if name and name.startswith('MI'):  # is a Mi Band device
            self.device = device
            self.stop_scan()

    def on_scan_completed(self):
        if self.device:
            self.connect_gatt(self.device)  # connect to device

    def on_connection_state_change(self, status, state):
        if status == GATT_SUCCESS and state:  # connection established
            self.discover_services()  # discover what services a device offer
        else:  # disconnection or error
            self.alert_characteristic = None
            self.close_gatt()  # close current connection

    def on_services(self, status, services):
        # 0x2a06 is a standard code for "Alert Level" characteristic
        # https://www.bluetooth.com/specifications/gatt/viewer?attributeXmlFile=org.bluetooth.characteristic.alert_level.xml
        self.alert_characteristic = services.search('2a06')
        self.alert(self.alert_characteristic)

    def alert(self, characteristic):
        self.write_characteristic(characteristic, [2])  # 2 is for "High Alert"


class AlertApp(App):

    def build(self):
        self.ble = None
        return Button(text='Press to Alert Mi', on_press=self.start_alert)

    def start_alert(self, *args, **kwargs):
        if not self.ble:
            self.ble = BLE()
        self.ble.start_alert()


if __name__ == '__main__':
    install_exception_handler()
    AlertApp().run()

Full example code: alert

Change MTU#

"""Request MTU change, and write 100 bytes to a characteristic."""
from kivy.app import App
from kivy.clock import Clock
from kivy.logger import Logger
from kivy.uix.widget import Widget

from able import BluetoothDispatcher, GATT_SUCCESS


class BLESender(BluetoothDispatcher):

    def __init__(self):
        super().__init__()
        self.characteristic_to_write = None
        Clock.schedule_once(self.connect, 0)

    def connect(self, _):
        self.connect_by_device_address("FF:FF:FF:FF:FF:FF")

    def on_connection_state_change(self, status, state):
        if status == GATT_SUCCESS and state:
            self.discover_services()

    def on_services(self, status, services):
        if status == GATT_SUCCESS:
            self.characteristic_to_write = services.search("0d03")
            # Need to request 100 + 3 extra bytes for ATT packet header
            self.request_mtu(103)

    def on_mtu_changed(self, mtu, status):
        if status == GATT_SUCCESS and mtu == 103:
            Logger.info("MTU changed: now it is possible to send 100 bytes at once")
            self.write_characteristic(self.characteristic_to_write, range(100))
        else:
            Logger.error("MTU not changed: mtu=%d, status=%d", mtu, status)

    def on_characteristic_write(self, characteristic, status):
        if status == GATT_SUCCESS:
            Logger.info("Characteristic write succeed")
        else:
            Logger.error("Write status: %d", status)


class MTUApp(App):

    def build(self):
        BLESender()
        return Widget()


if __name__ == '__main__':
    MTUApp().run()

Scan settings#

from able import BluetoothDispatcher
from able.scan_settings import ScanSettingsBuilder, ScanSettings

# Use faster detection (more power usage) mode
settings = ScanSettingsBuilder().setScanMode(ScanSettings.SCAN_MODE_LOW_LATENCY)
BluetoothDispatcher().start_scan(settings=settings)

Scan filters#

from able import BluetoothDispatcher
from able.filters import (
    DeviceAddressFilter,
    DeviceNameFilter,
    ManufacturerDataFilter,
    ServiceDataFilter,
    ServiceUUIDFilter
)

ble = BluetoothDispatcher()

# Start scanning with the condition that device has one of names: "Device1" or "Device2"
ble.start_scan(filters=[DeviceNameFilter("Device1"), DeviceNameFilter("Device2")])
ble.stop_scan()

# Start scanning with the condition that
# device advertises "180f" service and one of names: "Device1" or "Device2"
ble.start_scan(filters=[
    ServiceUUIDFilter('0000180f-0000-1000-8000-00805f9b34fb') & DeviceNameFilter("Device1"),
    ServiceUUIDFilter('0000180f-0000-1000-8000-00805f9b34fb') & DeviceNameFilter("Device2")
])

Adapter state#

"""Detect and log Bluetooth adapter state change."""

from typing import Optional

from kivy.logger import Logger
from kivy.uix.widget import Widget

from able import AdapterState, BluetoothDispatcher


class Dispatcher(BluetoothDispatcher):
    def on_bluetooth_adapter_state_change(self, state: int):
        Logger.info(
            f"Bluetoth adapter state changed to {state} ('{AdapterState(state).name}')."
        )
        if state == AdapterState.OFF:
            Logger.info("Adapter state changed to OFF.")


class StateChangeApp(App):
    def build(self):
        Dispatcher()
        return Widget()


if __name__ == "__main__":
    StateChangeApp.run()

Advertising#

Set and advertise device name#

from able import BluetoothDispatcher
from able.advertising import Advertiser, AdvertiseData, DeviceName

ble = BluetoothDispatcher()
ble.name = "New test device name"

# There must be a wait and check, it takes time for new name to take effect
print(f"New device name is set: {ble.name}")

Advertiser(
    ble=ble,
    data=AdvertiseData(DeviceName())
)

Battery service data#

"""Advertise battery level, that degrades every second."""
from kivy.app import App
from kivy.clock import Clock
from kivy.uix.label import Label

from able import BluetoothDispatcher
from able import advertising

# Standard fully-qualified UUID for the Battery Service
BATTERY_SERVICE_UUID = "0000180f-0000-1000-8000-00805f9b34fb"


class BatteryAdvertiser(advertising.Advertiser):

    def on_advertising_started(self, advertising_set, tx_power, status):
        if status == advertising.Status.SUCCESS:
            print("Advertising is started successfully")
        else:
            print(f"Advertising start error status: {status}")

    def on_advertising_stopped(self, advertising_set):
        print("Advertising stopped")


class BatteryLabel(Label):
    """Widget to control advertiser and show current battery level."""

    def __init__(self):
        self._level = 0
        super().__init__(text="Waiting for advertising to start...")
        self.advertiser = BatteryAdvertiser(
            ble=BluetoothDispatcher(),
            data=self.construct_data(level=100),
            interval=advertising.Interval.MIN
        )
        self.advertiser.bind(on_advertising_started=self.on_started)  # bind to start of advertising
        self.advertiser.start()

    def on_started(self, advertiser, advertising_set, tx_power, status):
        if status == advertising.Status.SUCCESS:
            # Advertising is started - update battery level every second
            self.clock = Clock.schedule_interval(self.update_level, 1)

    def update_level(self, dt):
        level = self._level = (self._level - 1) % 101
        self.text = str(level)

        if level > 0:
            # Set new advertising data
            self.advertiser.data = self.construct_data(level)
        else:
            self.clock.cancel()
            # Stop advertising
            self.advertiser.stop()

    def construct_data(self, level):
        return advertising.AdvertiseData(
            advertising.DeviceName(),
            advertising.TXPowerLevel(),
            advertising.ServiceData(BATTERY_SERVICE_UUID, [level])
        )


class BatteryApp(App):

    def build(self):
        return BatteryLabel()


if __name__ == "__main__":
    BatteryApp().run()

Use iBeacon advertising format#

import uuid
from able import BluetoothDispatcher
from able.advertising import Advertiser, AdvertiseData, ManufacturerData


data = AdvertiseData(
    ManufacturerData(
        0x4C,  # Apple Manufacturer ID
        bytes([
            0x2, # SubType: Custom Manufacturer Data
            0x15 # Subtype lenth
        ]) +
        uuid.uuid4().bytes +  # UUID of beacon
        bytes([
            0, 15,  # Major value
            0, 1,  # Minor value
            10  # RSSI, dBm at 1m
        ]))
)

Advertiser(BluetoothDispatcher(), data).start()

Android Services#

BLE devices scanning service#

main.py

"""Start BLE devices scaning service."""
from able import (
    BluetoothDispatcher,
    require_bluetooth_enabled,
)
from jnius import autoclass
from kivy.app import App
from kivy.lang import Builder


kv = """
BoxLayout:
   Button:
      text: 'Start service'
      on_press: app.ble_dispatcher.start_service()
   Button:
      text: 'Stop service'
      on_press: app.ble_dispatcher.stop_service()
"""


class Dispatcher(BluetoothDispatcher):
    @property
    def service(self):
        return autoclass("test.able.scanservice.ServiceAble")

    @property
    def activity(self):
        return autoclass("org.kivy.android.PythonActivity").mActivity

    # Need to turn on the adapter and obtain permissions, before service is started
    @require_bluetooth_enabled
    def start_service(self):
        self.service.start(self.activity, "")
        App.get_running_app().stop()  # Can close the app, service will continue to run

    def stop_service(self):
        self.service.stop(self.activity)


class ServiceApp(App):
    def build(self):
        self.ble_dispatcher = Dispatcher()
        return Builder.load_string(kv)


if __name__ == "__main__":
    ServiceApp().run()

service.py

"""Service to run BLE scan for 60 seconds,
and log each `on_device` event.
"""
import time

from able import BluetoothDispatcher
from kivy.logger import Logger


class BLE(BluetoothDispatcher):
    def on_device(self, device, rssi, advertisement):
        title = device.getName() or device.getAddress()
        Logger.info("BLE Device found: %s", title)

    def on_error(self, msg):
        Logger.error("BLE Error %s", msg)


def main():
    ble = BLE()
    ble.start_scan()
    time.sleep(60)
    ble.stop_scan()


if __name__ == "__main__":
    main()

Full example code: service_scan

Advertising service#

main.py

"""Start advertising service."""
from able import BluetoothDispatcher, Permission, require_bluetooth_enabled
from jnius import autoclass
from kivy.app import App
from kivy.lang import Builder


kv = """
BoxLayout:
   Button:
      text: 'Start service'
      on_press: app.ble_dispatcher.start_service()
   Button:
      text: 'Stop service'
      on_press: app.ble_dispatcher.stop_service()
"""


class Dispatcher(BluetoothDispatcher):
    @property
    def service(self):
        return autoclass("test.able.advservice.ServiceAble")

    @property
    def activity(self):
        return autoclass("org.kivy.android.PythonActivity").mActivity

    # Need to turn on the adapter, before service is started
    @require_bluetooth_enabled
    def start_service(self):
        self.service.start(
            self.activity,
            # Pass UUID to advertise
            "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb",
        )
        App.get_running_app().stop()  # Can close the app, service will continue running

    def stop_service(self):
        self.service.stop(self.activity)


class ServiceApp(App):
    def build(self):
        self.ble_dispatcher = Dispatcher(
            # This app does not use device scanning,
            # so the list of required permissions can be reduced
            runtime_permissions=[
                Permission.BLUETOOTH_CONNECT,
                Permission.BLUETOOTH_ADVERTISE,
            ]
        )
        return Builder.load_string(kv)


if __name__ == "__main__":
    ServiceApp().run()

service.py

"""Service to advertise data, while not stopped."""
import time
from os import environ

from able import BluetoothDispatcher
from able.advertising import (
    Advertiser,
    AdvertiseData,
    ServiceUUID,
)


def main():
    uuid = environ.get(
        "PYTHON_SERVICE_ARGUMENT",
        "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
    )
    advertiser = Advertiser(
        ble=BluetoothDispatcher(),
        data=AdvertiseData(ServiceUUID(uuid)),
    )
    advertiser.start()
    while True:
        time.sleep(0xDEAD)


if __name__ == "__main__":
    main()

Full example code: service_advertise

Connect to multiple devices#

"""Scan for devices with name "KivyBLETest",
connect and periodically read connected devices RSSI.

Multiple `BluetoothDispatcher` objects are used:
    one for the scanning process and one for every connected device.
"""
from able import GATT_SUCCESS, BluetoothDispatcher
from able.filters import DeviceNameFilter
from kivy.app import App
from kivy.clock import Clock
from kivy.logger import Logger
from kivy.uix.label import Label


class DeviceDispatcher(BluetoothDispatcher):
    """Dispatcher to control a single BLE device."""

    def __init__(self, device: "BluetoothDevice"):
        super().__init__()
        self._device = device
        self._address: str = device.getAddress()
        self._name: str = device.getName() or ""

    @property
    def title(self) -> str:
        return f"<{self._address}><{self._name}>"

    def on_connection_state_change(self, status: int, state: int):
        if status == GATT_SUCCESS and state:
            Logger.info(f"Device: {self.title} connected")
        else:
            Logger.info(f"Device: {self.title} disconnected. {status=}, {state=}")
            self.close_gatt()
            Clock.schedule_once(callback=lambda dt: self.reconnect(), timeout=15)

    def on_rssi_updated(self, rssi: int, status: int):
        Logger.info(f"Device: {self.title} RSSI: {rssi}")

    def periodically_update_rssi(self):
        """
        Clock callback to read
        the signal strength indicator for a connected device.
        """
        if self.gatt:  # if device is connected
            self.update_rssi()

    def reconnect(self):
        Logger.info(f"Device: {self.title} try to reconnect ...")
        self.connect_gatt(self._device)

    def start(self):
        """Start connection to device."""
        if not self.gatt:
            self.connect_gatt(self._device)
            Clock.schedule_interval(
                callback=lambda dt: self.periodically_update_rssi(), timeout=5
            )


class ScannerDispatcher(BluetoothDispatcher):
    """Dispatcher to control the scanning process."""

    def __init__(self):
        super().__init__()
        # Stores connected devices addresses
        self._devices: dict[str, DeviceDispatcher] = {}

    def on_scan_started(self, success: bool):
        if success:
            Logger.info("Scan: started")
        else:
            Logger.error("Scan: error on start")

    def on_scan_completed(self):
        Logger.info("Scan: completed")

    def on_device(self, device, rssi, advertisement):
        address = device.getAddress()
        if address not in self._devices:
            # Create dispatcher instance for a new device
            dispatcher = DeviceDispatcher(device)
            # Remember address,
            # to avoid multiple dispatchers creation for this device
            self._devices[address] = dispatcher
            Logger.info(f"Scan: device <{address}> added")
            dispatcher.start()


class MultiDevicesApp(App):
    def build(self):
        ScannerDispatcher().start_scan(filters=[DeviceNameFilter("KivyBLETest")])
        return Label(text=self.name)


if __name__ == "__main__":
    MultiDevicesApp().run()

Full example code: multi_devices