forked from auracaster/bumble_mirror
Compare commits
6 Commits
jdm/connec
...
v0.0.6
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d7ce62beaa | ||
|
|
0e2a184edb | ||
|
|
e6ee5ae996 | ||
|
|
f1836e659f | ||
|
|
99218d3abf | ||
|
|
b5ba0bef63 |
23
README.md
23
README.md
@@ -21,6 +21,29 @@ or see the documentation source under `docs/mkdocs/src`, or build the static HTM
|
||||
mkdocs build -f docs/mkdocs/mkdocs.yml
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
### Getting Started
|
||||
|
||||
For a quick start to using Bumble, see the [Getting Started](docs/mkdocs/src/getting_started.md) guide.
|
||||
|
||||
### Dependencies
|
||||
|
||||
To install package dependencies needed to run the bumble examples execute the following commands:
|
||||
|
||||
```
|
||||
python -m pip install --upgrade pip
|
||||
python -m pip install ".[test,development,documentation]"
|
||||
```
|
||||
|
||||
### Examples
|
||||
|
||||
Refer to the [Example Documentation](examples/README.md) for details on the included example scripts and how to run them.
|
||||
|
||||
The complete [list of Examples](/docs/mkdocs/src/examples/index.md), and what they are designed to do is here.
|
||||
|
||||
There are also a set of [Apps and Tools](docs/mkdocs/src/apps_and_tools/index.md) that show the utility of Bumble.
|
||||
|
||||
## License
|
||||
|
||||
Licensed under the [Apache 2.0](LICENSE) License.
|
||||
|
||||
@@ -32,6 +32,7 @@ from bumble.core import UUID, AdvertisingData
|
||||
from bumble.device import Device, Connection, Peer
|
||||
from bumble.utils import AsyncRunner
|
||||
from bumble.transport import open_transport_or_link
|
||||
from bumble.gatt import Characteristic
|
||||
|
||||
from prompt_toolkit import Application
|
||||
from prompt_toolkit.history import FileHistory
|
||||
@@ -330,9 +331,24 @@ class ConsoleApp:
|
||||
|
||||
await self.show_attributes(attributes)
|
||||
|
||||
def find_attribute(self, param):
|
||||
parts = param.split('.')
|
||||
if len(parts) == 2:
|
||||
service_uuid = UUID(parts[0]) if parts[0] != '*' else None
|
||||
characteristic_uuid = UUID(parts[1])
|
||||
for service in self.connected_peer.services:
|
||||
if service_uuid is None or service.uuid == service_uuid:
|
||||
for characteristic in service.characteristics:
|
||||
if characteristic.uuid == characteristic_uuid:
|
||||
return characteristic
|
||||
elif len(parts) == 1:
|
||||
if parts[0].startswith('#'):
|
||||
attribute_handle = int(f'{parts[0][1:]}', 16)
|
||||
return attribute_handle
|
||||
|
||||
async def command(self, command):
|
||||
try:
|
||||
(keyword, *params) = command.strip().split(' ', 1)
|
||||
(keyword, *params) = command.strip().split(' ')
|
||||
keyword = keyword.replace('-', '_').lower()
|
||||
handler = getattr(self, f'do_{keyword}', None)
|
||||
if handler:
|
||||
@@ -441,26 +457,46 @@ class ConsoleApp:
|
||||
self.show_error('invalid syntax', 'expected read <attribute>')
|
||||
return
|
||||
|
||||
parts = params[0].split('.')
|
||||
if len(parts) == 2:
|
||||
service_uuid = UUID(parts[0]) if parts[0] != '*' else None
|
||||
characteristic_uuid = UUID(parts[1])
|
||||
for service in self.connected_peer.services:
|
||||
if service_uuid is None or service.uuid == service_uuid:
|
||||
for characteristic in service.characteristics:
|
||||
if characteristic.uuid == characteristic_uuid:
|
||||
value = await self.connected_peer.read_value(characteristic)
|
||||
self.append_to_output(f'VALUE: {value}')
|
||||
return
|
||||
attribute = self.find_attribute(params[0])
|
||||
if attribute is None:
|
||||
self.show_error('no such characteristic')
|
||||
elif len(parts) == 1:
|
||||
if parts[0].startswith('#'):
|
||||
attribute_handle = int(f'{parts[0][1:]}', 16)
|
||||
value = await self.connected_peer.read_value(attribute_handle)
|
||||
self.append_to_output(f'VALUE: {value}')
|
||||
return
|
||||
return
|
||||
|
||||
value = await self.connected_peer.read_value(attribute)
|
||||
self.append_to_output(f'VALUE: {value}')
|
||||
|
||||
async def do_write(self, params):
|
||||
if not self.connected_peer:
|
||||
self.show_error('not connected')
|
||||
return
|
||||
|
||||
if len(params) != 2:
|
||||
self.show_error('invalid syntax', 'expected write <attribute> <value>')
|
||||
return
|
||||
|
||||
if params[1].upper().startswith("0X"):
|
||||
value = bytes.fromhex(params[1][2:]) # parse as hex string
|
||||
else:
|
||||
try:
|
||||
value = int(params[1]) # try as integer
|
||||
except ValueError:
|
||||
value = str.encode(params[1]) # must be a string
|
||||
|
||||
attribute = self.find_attribute(params[0])
|
||||
if attribute is None:
|
||||
self.show_error('no such characteristic')
|
||||
return
|
||||
|
||||
# use write with response if supported
|
||||
with_response = (
|
||||
(attribute.properties & Characteristic.WRITE)
|
||||
if hasattr(attribute, "properties")
|
||||
else False
|
||||
)
|
||||
|
||||
await self.connected_peer.write_value(
|
||||
attribute, value, with_response=with_response
|
||||
)
|
||||
|
||||
async def do_exit(self, params):
|
||||
self.ui.exit()
|
||||
|
||||
Reference in New Issue
Block a user