Usage Examples
Contents
Usage Examples#
Alert#
"""Turn the alert on Mi Band device
"""
from kivy.app import App
from kivy.uix.button import Button
from able import BluetoothDispatcher, GATT_SUCCESS
from error_message import install_exception_handler
class BLE(BluetoothDispatcher):
device = alert_characteristic = None
def start_alert(self, *args, **kwargs):
if self.alert_characteristic: # alert service is already discovered
self.alert(self.alert_characteristic)
elif self.device: # device is already founded during the scan
self.connect_gatt(self.device) # reconnect
else:
self.stop_scan() # stop previous scan
self.start_scan() # start a scan for devices
def on_device(self, device, rssi, advertisement):
# some device is found during the scan
name = device.getName()
if name and name.startswith('MI'): # is a Mi Band device
self.device = device
self.stop_scan()
def on_scan_completed(self):
if self.device:
self.connect_gatt(self.device) # connect to device
def on_connection_state_change(self, status, state):
if status == GATT_SUCCESS and state: # connection established
self.discover_services() # discover what services a device offer
else: # disconnection or error
self.alert_characteristic = None
self.close_gatt() # close current connection
def on_services(self, status, services):
# 0x2a06 is a standard code for "Alert Level" characteristic
# https://www.bluetooth.com/specifications/gatt/viewer?attributeXmlFile=org.bluetooth.characteristic.alert_level.xml
self.alert_characteristic = services.search('2a06')
self.alert(self.alert_characteristic)
def alert(self, characteristic):
self.write_characteristic(characteristic, [2]) # 2 is for "High Alert"
class AlertApp(App):
def build(self):
self.ble = None
return Button(text='Press to Alert Mi', on_press=self.start_alert)
def start_alert(self, *args, **kwargs):
if not self.ble:
self.ble = BLE()
self.ble.start_alert()
if __name__ == '__main__':
install_exception_handler()
AlertApp().run()
Full example code: alert
Change MTU#
"""Request MTU change, and write 100 bytes to a characteristic."""
from kivy.app import App
from kivy.clock import Clock
from kivy.logger import Logger
from kivy.uix.widget import Widget
from able import BluetoothDispatcher, GATT_SUCCESS
class BLESender(BluetoothDispatcher):
def __init__(self):
super().__init__()
self.characteristic_to_write = None
Clock.schedule_once(self.connect, 0)
def connect(self, _):
self.connect_by_device_address("FF:FF:FF:FF:FF:FF")
def on_connection_state_change(self, status, state):
if status == GATT_SUCCESS and state:
self.discover_services()
def on_services(self, status, services):
if status == GATT_SUCCESS:
self.characteristic_to_write = services.search("0d03")
# Need to request 100 + 3 extra bytes for ATT packet header
self.request_mtu(103)
def on_mtu_changed(self, mtu, status):
if status == GATT_SUCCESS and mtu == 103:
Logger.info("MTU changed: now it is possible to send 100 bytes at once")
self.write_characteristic(self.characteristic_to_write, range(100))
else:
Logger.error("MTU not changed: mtu=%d, status=%d", mtu, status)
def on_characteristic_write(self, characteristic, status):
if status == GATT_SUCCESS:
Logger.info("Characteristic write succeed")
else:
Logger.error("Write status: %d", status)
class MTUApp(App):
def build(self):
BLESender()
return Widget()
if __name__ == '__main__':
MTUApp().run()
Scan settings#
from able import BluetoothDispatcher
from able.scan_settings import ScanSettingsBuilder, ScanSettings
# Use faster detection (more power usage) mode
settings = ScanSettingsBuilder().setScanMode(ScanSettings.SCAN_MODE_LOW_LATENCY)
BluetoothDispatcher().start_scan(settings=settings)
Scan filters#
from able import BluetoothDispatcher
from able.filters import (
DeviceAddressFilter,
DeviceNameFilter,
ManufacturerDataFilter,
ServiceDataFilter,
ServiceUUIDFilter
)
ble = BluetoothDispatcher()
# Start scanning with the condition that device has one of names: "Device1" or "Device2"
ble.start_scan(filters=[DeviceNameFilter("Device1"), DeviceNameFilter("Device2")])
ble.stop_scan()
# Start scanning with the condition that
# device advertises "180f" service and one of names: "Device1" or "Device2"
ble.start_scan(filters=[
ServiceUUIDFilter('0000180f-0000-1000-8000-00805f9b34fb') & DeviceNameFilter("Device1"),
ServiceUUIDFilter('0000180f-0000-1000-8000-00805f9b34fb') & DeviceNameFilter("Device2")
])
Adapter state#
"""Detect and log Bluetooth adapter state change."""
from typing import Optional
from kivy.logger import Logger
from kivy.uix.widget import Widget
from able import AdapterState, BluetoothDispatcher
class Dispatcher(BluetoothDispatcher):
def on_bluetooth_adapter_state_change(self, state: int):
Logger.info(
f"Bluetoth adapter state changed to {state} ('{AdapterState(state).name}')."
)
if state == AdapterState.OFF:
Logger.info("Adapter state changed to OFF.")
class StateChangeApp(App):
def build(self):
Dispatcher()
return Widget()
if __name__ == "__main__":
StateChangeApp.run()
Advertising#
Advertise with data and additional (scannable) data#
from able import BluetoothDispatcher
from able.advertising import (
Advertiser,
AdvertiseData,
ManufacturerData,
Interval,
ServiceUUID,
ServiceData,
TXPower,
)
advertiser = Advertiser(
ble=BluetoothDispatcher(),
data=AdvertiseData(ServiceUUID("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa")),
scan_data=AdvertiseData(ManufacturerData(id=0xAABB, data=b"some data")),
interval=Interval.MEDIUM,
tx_power=TXPower.MEDIUM,
)
advertiser.start()
Set and advertise device name#
from able import BluetoothDispatcher
from able.advertising import Advertiser, AdvertiseData, DeviceName
ble = BluetoothDispatcher()
ble.name = "New test device name"
# There must be a wait and check, it takes time for new name to take effect
print(f"New device name is set: {ble.name}")
Advertiser(
ble=ble,
data=AdvertiseData(DeviceName())
)
Battery service data#
"""Advertise battery level, that degrades every second."""
from kivy.app import App
from kivy.clock import Clock
from kivy.uix.label import Label
from able import BluetoothDispatcher
from able import advertising
# Standard fully-qualified UUID for the Battery Service
BATTERY_SERVICE_UUID = "0000180f-0000-1000-8000-00805f9b34fb"
class BatteryAdvertiser(advertising.Advertiser):
def on_advertising_started(self, advertising_set, tx_power, status):
if status == advertising.Status.SUCCESS:
print("Advertising is started successfully")
else:
print(f"Advertising start error status: {status}")
def on_advertising_stopped(self, advertising_set):
print("Advertising stopped")
class BatteryLabel(Label):
"""Widget to control advertiser and show current battery level."""
def __init__(self):
self._level = 0
super().__init__(text="Waiting for advertising to start...")
self.advertiser = BatteryAdvertiser(
ble=BluetoothDispatcher(),
data=self.construct_data(level=100),
interval=advertising.Interval.MIN
)
self.advertiser.bind(on_advertising_started=self.on_started) # bind to start of advertising
self.advertiser.start()
def on_started(self, advertiser, advertising_set, tx_power, status):
if status == advertising.Status.SUCCESS:
# Advertising is started - update battery level every second
self.clock = Clock.schedule_interval(self.update_level, 1)
def update_level(self, dt):
level = self._level = (self._level - 1) % 101
self.text = str(level)
if level > 0:
# Set new advertising data
self.advertiser.data = self.construct_data(level)
else:
self.clock.cancel()
# Stop advertising
self.advertiser.stop()
def construct_data(self, level):
return advertising.AdvertiseData(
advertising.DeviceName(),
advertising.TXPowerLevel(),
advertising.ServiceData(BATTERY_SERVICE_UUID, [level])
)
class BatteryApp(App):
def build(self):
return BatteryLabel()
if __name__ == "__main__":
BatteryApp().run()
Use iBeacon advertising format#
import uuid
from able import BluetoothDispatcher
from able.advertising import Advertiser, AdvertiseData, ManufacturerData
data = AdvertiseData(
ManufacturerData(
0x4C, # Apple Manufacturer ID
bytes([
0x2, # SubType: Custom Manufacturer Data
0x15 # Subtype lenth
]) +
uuid.uuid4().bytes + # UUID of beacon
bytes([
0, 15, # Major value
0, 1, # Minor value
10 # RSSI, dBm at 1m
]))
)
Advertiser(BluetoothDispatcher(), data).start()
Android Services#
BLE devices scanning service#
main.py
"""Start BLE devices scaning service."""
from able import (
BluetoothDispatcher,
require_bluetooth_enabled,
)
from jnius import autoclass
from kivy.app import App
from kivy.lang import Builder
kv = """
BoxLayout:
Button:
text: 'Start service'
on_press: app.ble_dispatcher.start_service()
Button:
text: 'Stop service'
on_press: app.ble_dispatcher.stop_service()
"""
class Dispatcher(BluetoothDispatcher):
@property
def service(self):
return autoclass("test.able.scanservice.ServiceAble")
@property
def activity(self):
return autoclass("org.kivy.android.PythonActivity").mActivity
# Need to turn on the adapter and obtain permissions, before service is started
@require_bluetooth_enabled
def start_service(self):
self.service.start(self.activity, "")
App.get_running_app().stop() # Can close the app, service will continue to run
def stop_service(self):
self.service.stop(self.activity)
class ServiceApp(App):
def build(self):
self.ble_dispatcher = Dispatcher()
return Builder.load_string(kv)
if __name__ == "__main__":
ServiceApp().run()
service.py
"""Service to run BLE scan for 60 seconds,
and log each `on_device` event.
"""
import time
from able import BluetoothDispatcher
from kivy.logger import Logger
class BLE(BluetoothDispatcher):
def on_device(self, device, rssi, advertisement):
title = device.getName() or device.getAddress()
Logger.info("BLE Device found: %s", title)
def on_error(self, msg):
Logger.error("BLE Error %s", msg)
def main():
ble = BLE()
ble.start_scan()
time.sleep(60)
ble.stop_scan()
if __name__ == "__main__":
main()
Full example code: service_scan
Advertising service#
main.py
"""Start advertising service."""
from able import BluetoothDispatcher, Permission, require_bluetooth_enabled
from jnius import autoclass
from kivy.app import App
from kivy.lang import Builder
kv = """
BoxLayout:
Button:
text: 'Start service'
on_press: app.ble_dispatcher.start_service()
Button:
text: 'Stop service'
on_press: app.ble_dispatcher.stop_service()
"""
class Dispatcher(BluetoothDispatcher):
@property
def service(self):
return autoclass("test.able.advservice.ServiceAble")
@property
def activity(self):
return autoclass("org.kivy.android.PythonActivity").mActivity
# Need to turn on the adapter, before service is started
@require_bluetooth_enabled
def start_service(self):
self.service.start(
self.activity,
# Pass UUID to advertise
"bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb",
)
App.get_running_app().stop() # Can close the app, service will continue running
def stop_service(self):
self.service.stop(self.activity)
class ServiceApp(App):
def build(self):
self.ble_dispatcher = Dispatcher(
# This app does not use device scanning,
# so the list of required permissions can be reduced
runtime_permissions=[
Permission.BLUETOOTH_CONNECT,
Permission.BLUETOOTH_ADVERTISE,
]
)
return Builder.load_string(kv)
if __name__ == "__main__":
ServiceApp().run()
service.py
"""Service to advertise data, while not stopped."""
import time
from os import environ
from able import BluetoothDispatcher
from able.advertising import (
Advertiser,
AdvertiseData,
ServiceUUID,
)
def main():
uuid = environ.get(
"PYTHON_SERVICE_ARGUMENT",
"aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
)
advertiser = Advertiser(
ble=BluetoothDispatcher(),
data=AdvertiseData(ServiceUUID(uuid)),
)
advertiser.start()
while True:
time.sleep(0xDEAD)
if __name__ == "__main__":
main()
Full example code: service_advertise
Connect to multiple devices#
"""Scan for devices with name "KivyBLETest",
connect and periodically read connected devices RSSI.
Multiple `BluetoothDispatcher` objects are used:
one for the scanning process and one for every connected device.
"""
from able import GATT_SUCCESS, BluetoothDispatcher
from able.filters import DeviceNameFilter
from kivy.app import App
from kivy.clock import Clock
from kivy.logger import Logger
from kivy.uix.label import Label
class DeviceDispatcher(BluetoothDispatcher):
"""Dispatcher to control a single BLE device."""
def __init__(self, device: "BluetoothDevice"):
super().__init__()
self._device = device
self._address: str = device.getAddress()
self._name: str = device.getName() or ""
@property
def title(self) -> str:
return f"<{self._address}><{self._name}>"
def on_connection_state_change(self, status: int, state: int):
if status == GATT_SUCCESS and state:
Logger.info(f"Device: {self.title} connected")
else:
Logger.info(f"Device: {self.title} disconnected. {status=}, {state=}")
self.close_gatt()
Clock.schedule_once(callback=lambda dt: self.reconnect(), timeout=15)
def on_rssi_updated(self, rssi: int, status: int):
Logger.info(f"Device: {self.title} RSSI: {rssi}")
def periodically_update_rssi(self):
"""
Clock callback to read
the signal strength indicator for a connected device.
"""
if self.gatt: # if device is connected
self.update_rssi()
def reconnect(self):
Logger.info(f"Device: {self.title} try to reconnect ...")
self.connect_gatt(self._device)
def start(self):
"""Start connection to device."""
if not self.gatt:
self.connect_gatt(self._device)
Clock.schedule_interval(
callback=lambda dt: self.periodically_update_rssi(), timeout=5
)
class ScannerDispatcher(BluetoothDispatcher):
"""Dispatcher to control the scanning process."""
def __init__(self):
super().__init__()
# Stores connected devices addresses
self._devices: dict[str, DeviceDispatcher] = {}
def on_scan_started(self, success: bool):
if success:
Logger.info("Scan: started")
else:
Logger.error("Scan: error on start")
def on_scan_completed(self):
Logger.info("Scan: completed")
def on_device(self, device, rssi, advertisement):
address = device.getAddress()
if address not in self._devices:
# Create dispatcher instance for a new device
dispatcher = DeviceDispatcher(device)
# Remember address,
# to avoid multiple dispatchers creation for this device
self._devices[address] = dispatcher
Logger.info(f"Scan: device <{address}> added")
dispatcher.start()
class MultiDevicesApp(App):
def build(self):
ScannerDispatcher().start_scan(filters=[DeviceNameFilter("KivyBLETest")])
return Label(text=self.name)
if __name__ == "__main__":
MultiDevicesApp().run()
Full example code: multi_devices