forked from auracaster/bumble_mirror
pyusb: allow to detect multiple usb dongle
Allow to detect multiple usb dongle by just provind the pid/vid
This commit is contained in:
@@ -23,7 +23,7 @@ import time
|
|||||||
import usb.core
|
import usb.core
|
||||||
import usb.util
|
import usb.util
|
||||||
|
|
||||||
from typing import Optional
|
from typing import Optional, Set
|
||||||
from usb.core import Device as UsbDevice
|
from usb.core import Device as UsbDevice
|
||||||
from usb.core import USBError
|
from usb.core import USBError
|
||||||
from usb.util import CTRL_TYPE_CLASS, CTRL_RECIPIENT_OTHER
|
from usb.util import CTRL_TYPE_CLASS, CTRL_RECIPIENT_OTHER
|
||||||
@@ -46,6 +46,11 @@ RESET_DELAY = 3
|
|||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
# Global
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
devices_in_use: Set[int] = set()
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
async def open_pyusb_transport(spec: str) -> Transport:
|
async def open_pyusb_transport(spec: str) -> Transport:
|
||||||
@@ -216,6 +221,7 @@ async def open_pyusb_transport(spec: str) -> Transport:
|
|||||||
async def close(self):
|
async def close(self):
|
||||||
await self.source.stop()
|
await self.source.stop()
|
||||||
await self.sink.stop()
|
await self.sink.stop()
|
||||||
|
devices_in_use.remove(device.address)
|
||||||
usb.util.release_interface(self.device, 0)
|
usb.util.release_interface(self.device, 0)
|
||||||
|
|
||||||
usb_find = usb.core.find
|
usb_find = usb.core.find
|
||||||
@@ -233,7 +239,18 @@ async def open_pyusb_transport(spec: str) -> Transport:
|
|||||||
spec = spec[1:]
|
spec = spec[1:]
|
||||||
if ':' in spec:
|
if ':' in spec:
|
||||||
vendor_id, product_id = spec.split(':')
|
vendor_id, product_id = spec.split(':')
|
||||||
device = usb_find(idVendor=int(vendor_id, 16), idProduct=int(product_id, 16))
|
device = None
|
||||||
|
devices = usb_find(
|
||||||
|
find_all=True, idVendor=int(vendor_id, 16), idProduct=int(product_id, 16)
|
||||||
|
)
|
||||||
|
for d in devices:
|
||||||
|
if d.address in devices_in_use:
|
||||||
|
continue
|
||||||
|
device = d
|
||||||
|
devices_in_use.add(d.address)
|
||||||
|
break
|
||||||
|
if device is None:
|
||||||
|
raise ValueError('device already in use')
|
||||||
elif '-' in spec:
|
elif '-' in spec:
|
||||||
|
|
||||||
def device_path(device):
|
def device_path(device):
|
||||||
|
|||||||
Reference in New Issue
Block a user