Adding in context manager for Connection and Peer classes

* Connection implements async context manager to disconnect when
  context is left
    * The Connection only calls disconnect if the context manager exits
      without an exception
* Peer implements async context manager to discover when entering the
  context
* Device.connect_as_gatt implements an async context manager to nest the
  connection and peer context managers
* Added HCI_StatusError that can be raised when a HCI Command Status
  event is received that doesn't show "PENDING" as status
* Added Connection.sustain to wait for a timeout or disconnect
* Peer.sustain also maps to Connectin.sustain
* Updated battery_client.py to use .connect_as_gatt and .sustain
* Updated heart_rate_client.py to use .connect_as_gatt and .sustain
This commit is contained in:
Jayson Messenger
2022-07-26 10:34:37 -04:00
parent 472702a9d9
commit 9cd1890faa
4 changed files with 97 additions and 46 deletions
+15 -19
View File
@@ -43,28 +43,24 @@ async def main():
# Connect to the peer
target_address = sys.argv[2]
print(f'=== Connecting to {target_address}...')
connection = await device.connect(target_address)
print(f'=== Connected to {connection}')
async with device.connect_as_gatt(target_address) as peer:
print(f'=== Connected to {peer}')
battery_service = peer.create_service_proxy(BatteryServiceProxy)
# Discover the Battery Service
peer = Peer(connection)
print('=== Discovering Battery Service')
battery_service = await peer.discover_service_and_create_proxy(BatteryServiceProxy)
# Check that the service was found
if not battery_service:
print('!!! Service not found')
return
# Check that the service was found
if not battery_service:
print('!!! Service not found')
return
# Subscribe to and read the battery level
if battery_service.battery_level:
await battery_service.battery_level.subscribe(
lambda value: print(f'{color("Battery Level Update:", "green")} {value}')
)
value = await battery_service.battery_level.read_value()
print(f'{color("Initial Battery Level:", "green")} {value}')
# Subscribe to and read the battery level
if battery_service.battery_level:
await battery_service.battery_level.subscribe(
lambda value: print(f'{color("Battery Level Update:", "green")} {value}')
)
value = await battery_service.battery_level.read_value()
print(f'{color("Initial Battery Level:", "green")} {value}')
await hci_source.wait_for_termination()
await peer.sustain()
# -----------------------------------------------------------------------------
+17 -20
View File
@@ -43,31 +43,28 @@ async def main():
# Connect to the peer
target_address = sys.argv[2]
print(f'=== Connecting to {target_address}...')
connection = await device.connect(target_address)
print(f'=== Connected to {connection}')
async with device.connect_as_gatt(target_address) as peer:
print(f'=== Connected to {peer}')
# Discover the Heart Rate Service
peer = Peer(connection)
print('=== Discovering Heart Rate Service')
heart_rate_service = await peer.discover_service_and_create_proxy(HeartRateServiceProxy)
heart_rate_service = peer.create_service_proxy(HeartRateServiceProxy)
# Check that the service was found
if not heart_rate_service:
print('!!! Service not found')
return
# Check that the service was found
if not heart_rate_service:
print('!!! Service not found')
return
# Read the body sensor location
if heart_rate_service.body_sensor_location:
location = await heart_rate_service.body_sensor_location.read_value()
print(color('Sensor Location:', 'green'), location)
# Read the body sensor location
if heart_rate_service.body_sensor_location:
location = await heart_rate_service.body_sensor_location.read_value()
print(color('Sensor Location:', 'green'), location)
# Subscribe to the heart rate measurement
if heart_rate_service.heart_rate_measurement:
await heart_rate_service.heart_rate_measurement.subscribe(
lambda value: print(f'{color("Heart Rate Measurement:", "green")} {value}')
)
# Subscribe to the heart rate measurement
if heart_rate_service.heart_rate_measurement:
await heart_rate_service.heart_rate_measurement.subscribe(
lambda value: print(f'{color("Heart Rate Measurement:", "green")} {value}')
)
await hci_source.wait_for_termination()
await peer.sustain()
# -----------------------------------------------------------------------------