Write the Python Application

The full code for the project is available on Github
Let's write the code to get the bluetooth radio advertising our service, allowing centrals to connect, and notifying those centrals when the temperature value changes. The full code is available on Github, but we'll look at some of the main components here.
We'll run the application from the command line of the Raspberry Pi with this command:
docker run -it --rm \
--privileged \
--workdir /app \
--volume "$PWD":/app \
--volume /var/run/dbus/:/var/run/dbus/:z \
soodesune/python3-bluetooth BTLE_manager.py
BTLE_Manager.py
The first section of code is going to run our application in test-mode if it's run as a stand alone application. We'll have it broadcast some random values to centrals that are listening.
if __name__ == '__main__':
bm = BluetoothManager()
# Put out some random values for debugging / testing purposes
#
while True:
bm.write_value(
[choice([0x01, 0x02, 0x03, 0x04, 0xAB, 0xDE])]
)
sleep(0.5)
Now let's look at what BluetoothManager
is doing.
class BluetoothManager:
def __init__(self):
bus = SystemMessageBus()
Thread(target=EventLoop().run).start()
bus.register_service("com.raspberrypi-bluetooth.Thermometer")
proxy = bus.get_proxy("org.bluez", "/org/bluez/hci0")
proxy.Set("org.bluez.Adapter1", "Powered", Variant("b", True))
print("Adapter powered: %s" % proxy.Get("org.bluez.Adapter1", "Powered"))
# We don't need pairing for this application
proxy.Set("org.bluez.Adapter1", "Pairable", Variant("b", False))
bus.publish_object(
Advertisement.PATH,
Advertisement().for_publication()
)
proxy.RegisterAdvertisement(Advertisement.PATH, {})
self.characteristic = Characteristic().for_publication()
bus.publish_object(
Characteristic.PATH,
self.characteristic
)
bus.publish_object(
Application.PATH,
Application().for_publication()
)
proxy.RegisterApplication(Application.PATH, {})
def write_value(self, value: List[Byte]) -> None:
self.characteristic.WriteValue(value, {})
Pretty simple, we're:
- Getting a
proxy
to the Bluetooth device (hci0
) and configuring some of it's settings. - Publishing our application components on D-Bus so that BlueZ can communicate back with our application.
- Adding a
write_value
function, so that the MLX-90164 has a way of publishing it's temperature readings.
Advertisement.py
class Advertisement(Publishable):
PATH = "/com/raspberrypi-bluetooth/Thermometer/Advertisement/1"
def for_publication(self):
return AdvertisementInterface(self)
@dbus_interface("org.bluez.LEAdvertisement1")
class AdvertisementInterface(InterfaceTemplate):
def Release(self) -> None:
print("released")
@property
def Type(self) -> Str:
return "peripheral"
@property
def ServiceUUIDS(self) -> List[Str]:
return [Service.UUID]
@property
def Discoverable(self) -> Bool:
return True
@property
def Includes(self) -> List[Str]:
return ["tx-power"]
@property
def LocalName(self) -> Str:
return "RasberryPi-Bluetooth.com Thermometer"
# https://specificationrefs.bluetooth.com/assigned-values/Appearance%20Values.pdf
@property
def Appearance(self) -> UInt16:
return 768 # Thermometer
@property
def MinInterval(self) -> UInt32:
return 0
@property
def MaxInterval(self) -> UInt32:
return 0
There's a pattern here that all of our application code is going to follow. We have a model class (Advertisement
) that inherits from the Publishable
class provided by Dasbus. The model class will have a method called for_publication
that delegates to an interface class that inherits from InterfaceTemplate
, which is also provided by the Dasbus library. The interface class is how D-Bus is going to see and interact with our application, while the model class is going to be where we do our internal logic.
Application.py
and Service.py
aren't that interesting. You can look at them on Github.
Characteristic.py
class Characteristic(Publishable):
PATH = "/com/raspberrypi-bluetooth/Thermometer/Characteristic/1"
UUID = "4116f8d2-9f66-4f58-a53d-fc7440e7c14e" # hex chars only!
# Will improve this later. Other problems to solve now.
_SERVICE_PATH = "/com/raspberrypi-bluetooth/Thermometer/Service/1"
def __init__(self):
self._value = [None]
super().__init__()
def for_publication(self):
return CharacteristicInterface(self)
@property
def value(self):
return self._value
@value.setter
def value(self, value):
self._value = value
@dbus_interface("org.bluez.GattCharacteristic1")
class CharacteristicInterfaceGatt(InterfaceTemplate):
def ReadValue(self, options: Dict[Str, Variant]) -> List[Byte]:
return self.implementation.value
@emits_properties_changed
def WriteValue(self, value: List[Byte], options: Dict[Str, Str]) -> None:
self.implementation.value = value
self.report_changed_property('Value')
@property
def UUID(self) -> Str:
return Characteristic.UUID
@property
def Service(self) -> ObjPath:
return Characteristic._SERVICE_PATH
@property
def Value(self) -> List[Byte]:
return self.implementation.value
class CharacteristicInterface(CharacteristicInterfaceGatt):
_UUID = Variant("s", Characteristic.UUID)
def get_properties(self):
return {
"org.bluez.GattCharacteristic1": {
"Service": Variant("o", Characteristic._SERVICE_PATH),
"UUID": self._UUID,
"Flags": Variant.new_array(None, [Variant("s", "read"), Variant("s", "notify")]),
"Descriptors": Variant.new_array(VariantType("t"), []),
}
}
In our model Characteristic.py
you can see we're keeping track of the temperature value from the MLX-90614. The interface class CharacteristicInterfaceGatt
exposes an interface to D-Bus that allows reading and writing of the thermometer value. Finally, CharacteristicInterface
publishes the Flags
so that central devices know they can read or get notifications when this characteristic changes.