桐乡九九信息网

深入解析Python如何利用PyUSB轻松操控USB设备

发布:2026-04-04 11:27 浏览:0
特别声明:本页信息由用户及第三方发布,真实性、合法性由发布人负责。详情请阅读九九信息网免责条款
详细信息

深入解析:Python如何利用PyUSB轻松操控USB设备

PyUSB是一个纯Python的USB访问模块,提供了对USB设备的底层访问能力。下面我将详细解析PyUSB的使用方法和工作原理。

一、PyUSB基础架构

1. 安装与依赖

# 安装PyUSB
pip install pyusb

# 需要安装后端库(以libusb为例)
# Ubuntu/Debian
sudo apt-get install libusb-1.0-0-dev

# macOS
brew install libusb

# Windows:下载libusb的DLL文件

2. 核心模块结构

import usb.core
import usb.util
import usb.backend

# PyUSB的主要组件:
# - usb.core: 核心功能模块
# - usb.util: 工具函数
# - usb.backend: 后端抽象层
# - usb.control: 控制传输相关

二、USB基础概念

USB设备通信要素

# USB通信的关键参数
VENDOR_ID = 0x1234      # 厂商ID
PRODUCT_ID = 0x5678     # 产品ID

# 端点类型
ENDPOINT_IN = 0x80      # 输入端点
ENDPOINT_OUT = 0x00     # 输出端点

# 传输类型
usb.util.CTRL_TYPE_STANDARD = 0x00
usb.util.CTRL_TYPE_CLASS = 0x20
usb.util.CTRL_TYPE_VENDOR = 0x40
usb.util.CTRL_TYPE_RESERVED = 0x60

三、PyUSB实战代码

1. 设备发现与连接

import usb.core
import usb.util

def find_and_connect_device():
    # 方法1:通过VID/PID查找设备
    dev = usb.core.find(idVendor=0x1234, idProduct=0x5678)

    if dev is None:
        raise ValueError("设备未找到")

    # 方法2:列出所有设备
    devices = usb.core.find(find_all=True)
    for device in devices:
        print(f"发现设备: {hex(device.idVendor)}:{hex(device.idProduct)}")

    # 重置设备
    try:
        dev.reset()
    except usb.core.USBError as e:
        print(f"重置失败: {e}")

    # 设置配置
    dev.set_configuration()

    # 获取设备描述符
    print(f"制造商: {dev.manufacturer}")
    print(f"产品: {dev.product}")
    print(f"序列号: {dev.serial_number}")

    return dev

2. 数据传输示例

class USBDeviceController:
    def __init__(self, vendor_id, product_id):
        self.dev = usb.core.find(idVendor=vendor_id, idProduct=product_id)
        if self.dev is None:
            raise ValueError("设备未找到")

        # 获取接口描述符
        self.cfg = self.dev.get_active_configuration()
        self.intf = self.cfg[(0,0)]  # 第一个接口

        # 查找端点
        self.ep_out = None
        self.ep_in = None

        for endpoint in self.intf:
            if usb.util.endpoint_direction(endpoint.bEndpointAddress) == \
               usb.util.ENDPOINT_OUT:
                self.ep_out = endpoint
            elif usb.util.endpoint_direction(endpoint.bEndpointAddress) == \
                 usb.util.ENDPOINT_IN:
                self.ep_in = endpoint

        if self.ep_out is None or self.ep_in is None:
            raise ValueError("端点未找到")

    def send_data(self, data, timeout=1000):
        """发送数据到设备"""
        # 确保数据是bytes类型
        if isinstance(data, str):
            data = data.encode('utf-8')

        # 写入数据
        written = self.ep_out.write(data, timeout)
        print(f"已发送 {written} 字节")
        return written

    def receive_data(self, size=64, timeout=1000):
        """从设备接收数据"""
        try:
            data = self.ep_in.read(size, timeout)
            return bytes(data)
        except usb.core.USBError as e:
            if e.errno == 110:  # 超时错误
                return None
            raise

    def control_transfer(self, bmRequestType, bRequest, wValue=0, wIndex=0,
                        data_or_wLength=None, timeout=1000):
        """执行控制传输"""
        return self.dev.ctrl_transfer(
            bmRequestType=bmRequestType,
            bRequest=bRequest,
            wValue=wValue,
            wIndex=wIndex,
            data_or_wLength=data_or_wLength,
            timeout=timeout
        )

3. 批量传输和中断传输

class AdvancedUSBCommunicator:
    def __init__(self, dev):
        self.dev = dev
        self.setup_endpoints()

    def setup_endpoints(self):
        """自动配置端点"""
        # 获取配置描述符
        cfg = self.dev.get_active_configuration()
        intf = cfg[(0,0)]

        # 存储所有端点
        self.endpoints = {}

        for ep in intf:
            addr = ep.bEndpointAddress
            ep_type = ep.bmAttributes & 0x03

            if ep_type == usb.util.ENDPOINT_TYPE_BULK:
                self.endpoints[addr] = {
                    'type': 'BULK',
                    'ep': ep,
                    'direction': 'IN' if addr & 0x80 else 'OUT'
                }
            elif ep_type == usb.util.ENDPOINT_TYPE_INTERRUPT:
                self.endpoints[addr] = {
                    'type': 'INTERRUPT',
                    'ep': ep,
                    'direction': 'IN' if addr & 0x80 else 'OUT'
                }

    def bulk_transfer(self, data, endpoint_addr, timeout=1000):
        """批量传输"""
        if endpoint_addr not in self.endpoints:
            raise ValueError(f"端点 {hex(endpoint_addr)} 不存在")

        endpoint = self.endpoints[endpoint_addr]['ep']

        if self.endpoints[endpoint_addr]['direction'] == 'OUT':
            return endpoint.write(data, timeout)
        else:
            return endpoint.read(len(data), timeout)

    def interrupt_transfer(self, data=None, endpoint_addr=None, timeout=1000):
        """中断传输"""
        if endpoint_addr is None:
            # 查找中断端点
            for addr, info in self.endpoints.items():
                if info['type'] == 'INTERRUPT':
                    endpoint_addr = addr
                    break

        if endpoint_addr is None:
            raise ValueError("未找到中断端点")

        endpoint = self.endpoints[endpoint_addr]['ep']

        if data is not None and self.endpoints[endpoint_addr]['direction'] == 'OUT':
            return endpoint.write(data, timeout)
        else:
            return endpoint.read(endpoint.wMaxPacketSize, timeout)

四、实用工具函数

import usb.core
import usb.util

class USBUtils:
    @staticmethod
    def list_all_devices():
        """列出所有USB设备"""
        devices = usb.core.find(find_all=True)
        device_list = []

        for dev in devices:
            try:
                info = {
                    'vendor_id': hex(dev.idVendor),
                    'product_id': hex(dev.idProduct),
                    'manufacturer': dev.manufacturer,
                    'product': dev.product,
                    'serial_number': dev.serial_number,
                    'bus': dev.bus,
                    'address': dev.address
                }
                device_list.append(info)
            except:
                continue

        return device_list

    @staticmethod
    def detach_kernel_driver(dev, interface=0):
        """分离内核驱动"""
        if dev.is_kernel_driver_active(interface):
            try:
                dev.detach_kernel_driver(interface)
                print(f"已分离接口 {interface} 的内核驱动")
                return True
            except usb.core.USBError as e:
                print(f"分离驱动失败: {e}")
                return False
        return True

    @staticmethod
    def claim_interface(dev, interface=0):
        """声明接口"""
        try:
            usb.util.claim_interface(dev, interface)
            return True
        except usb.core.USBError as e:
            print(f"声明接口失败: {e}")
            return False

    @staticmethod
    def release_interface(dev, interface=0):
        """释放接口"""
        try:
            usb.util.release_interface(dev, interface)
            return True
        except usb.core.USBError as e:
            print(f"释放接口失败: {e}")
            return False

    @staticmethod
    def get_endpoint_info(dev):
        """获取端点详细信息"""
        cfg = dev.get_active_configuration()
        interface_number = cfg[(0,0)].bInterfaceNumber

        print(f"接口: {interface_number}")
        print(f"配置: {dev.get_active_configuration().bConfigurationValue}")

        for interface in cfg:
            for endpoint in interface:
                print(f"\n端点地址: {hex(endpoint.bEndpointAddress)}")
                print(f"端点属性: {hex(endpoint.bmAttributes)}")
                print(f"最大包大小: {endpoint.wMaxPacketSize}")
                print(f"轮询间隔: {endpoint.bInterval}")

五、完整示例:USB HID设备通信

import usb.core
import usb.util
import struct
import time

class HIDDeviceHandler:
    # HID特定的请求类型
    HID_GET_REPORT = 0x01
    HID_SET_REPORT = 0x09
    HID_REPORT_TYPE_INPUT = 0x01
    HID_REPORT_TYPE_OUTPUT = 0x02
    HID_REPORT_TYPE_FEATURE = 0x03

    def __init__(self, vendor_id, product_id):
        self.dev = usb.core.find(idVendor=vendor_id, idProduct=product_id)
        if self.dev is None:
            raise ValueError("HID设备未找到")

        # 配置设备
        self.configure_device()

    def configure_device(self):
        """配置HID设备"""
        # 分离内核驱动
        if self.dev.is_kernel_driver_active(0):
            self.dev.detach_kernel_driver(0)

        # 设置配置
        self.dev.set_configuration()

        # 声明接口
        usb.util.claim_interface(self.dev, 0)

    def send_hid_report(self, report_id, data):
        """发送HID报告"""
        # 构造报告数据
        report_data = struct.pack('B', report_id) + data

        # 发送SET_REPORT请求
        self.dev.ctrl_transfer(
            bmRequestType=0x21,  # 主机到设备,类特定,接口
            bRequest=self.HID_SET_REPORT,
            wValue=(self.HID_REPORT_TYPE_OUTPUT << 8) | report_id,
            wIndex=0,
            data_or_wLength=report_data
        )

    def receive_hid_report(self, report_id, size):
        """接收HID报告"""
        # 发送GET_REPORT请求
        data = self.dev.ctrl_transfer(
            bmRequestType=0xA1,  # 设备到主机,类特定,接口
            bRequest=self.HID_GET_REPORT,
            wValue=(self.HID_REPORT_TYPE_INPUT << 8) | report_id,
            wIndex=0,
            data_or_wLength=size
        )

        return bytes(data)

    def read_interrupt(self, endpoint, size=64, timeout=1000):
        """从中断端点读取数据"""
        try:
            data = self.dev.read(endpoint, size, timeout)
            return bytes(data)
        except usb.core.USBError as e:
            print(f"读取错误: {e}")
            return None

    def cleanup(self):
        """清理资源"""
        usb.util.release_interface(self.dev, 0)
        self.dev.attach_kernel_driver(0)

# 使用示例
def hid_example():
    # 示例:与Arduino HID设备通信
    handler = HIDDeviceHandler(0x2341, 0x8036)  # Arduino Leonardo

    try:
        # 发送数据
        handler.send_hid_report(0, b'Hello Arduino!')

        # 接收数据
        time.sleep(0.1)
        data = handler.receive_hid_report(0, 64)
        print(f"收到数据: {data}")

        # 中断传输
        result = handler.read_interrupt(0x81, 64, 1000)
        print(f"中断数据: {result}")

    finally:
        handler.cleanup()

六、错误处理与调试

import usb.core
import usb.util

class RobustUSBConnection:
    def __init__(self, vendor_id, product_id, max_retries=3):
        self.vendor_id = vendor_id
        self.product_id = product_id
        self.max_retries = max_retries
        self.dev = None
        self.connect()

    def connect(self):
        """建立可靠连接"""
        for attempt in range(self.max_retries):
            try:
                self.dev = usb.core.find(
                    idVendor=self.vendor_id,
                    idProduct=self.product_id
                )

                if self.dev is None:
                    raise ConnectionError("设备未找到")

                # 重置设备
                self.dev.reset()

                # 设置配置
                try:
                    self.dev.set_configuration()
                except usb.core.USBError as e:
                    if e.errno == 16:  # 资源忙错误
                        usb.util.dispose_resources(self.dev)
                        self.dev.set_configuration()

                print("USB连接成功")
                return

            except usb.core.USBError as e:
                print(f"连接尝试 {attempt + 1} 失败: {e}")
                if attempt == self.max_retries - 1:
                    raise
                time.sleep(1)

    def safe_transfer(self, transfer_func, *args, **kwargs):
        """安全执行传输操作"""
        for attempt in range(self.max_retries):
            try:
                return transfer_func(*args, **kwargs)
            except usb.core.USBError as e:
                print(f"传输失败 (尝试 {attempt + 1}): {e}")

                # 特定错误处理
                if e.errno == 5:  # 输入/输出错误
                    self.reconnect()
                elif e.errno == 110:  # 超时
                    if attempt == self.max_retries - 1:
                        raise TimeoutError("操作超时")
                else:
                    raise

                time.sleep(0.5)

    def reconnect(self):
        """重新连接设备"""
        print("尝试重新连接...")
        if self.dev:
            usb.util.dispose_resources(self.dev)
        self.connect()

七、性能优化建议

使用适当的超时设置:根据设备特性调整超时时间 批量传输优化:对于大量数据,使用批量传输而非控制传输 缓冲区管理:预分配缓冲区减少内存分配开销 异步操作:考虑使用多线程处理USB通信

八、总结

PyUSB提供了强大的USB设备访问能力,关键要点:

设备发现:使用usb.core.find()查找设备 端点管理:理解IN/OUT端点,正确配置传输方向 传输类型:根据需求选择控制/批量/中断传输 资源管理:正确分离内核驱动,及时释放资源 错误处理:处理各种USB错误,实现健壮的重连机制

通过合理使用PyUSB,可以轻松实现与各种USB设备的通信,从简单的数据采集到复杂的设备控制都能胜任。记得在实际使用中根据具体设备文档调整参数和协议。

相关推荐