Skip to main content

最佳实践:控制循环模式

模式 A:周期控制 + 缓存状态

最适合:高频控制循环,每次迭代不需要最新状态。

完整代码

import time
from motorbridge import Controller, Mode

# ============ 配置 ============
DT_S = 0.02  # 控制周期 20ms = 50Hz
# ==============================

with Controller("can0") as ctrl:
    motor = ctrl.add_damiao_motor(0x01, 0x11, "4340P")

    ctrl.enable_all()
    motor.ensure_mode(Mode.MIT, 1000)

    for i in range(1000):
        t0 = time.time()

        # 1. 发送控制命令
        motor.send_mit(pos=0.5, vel=0.0, kp=30.0, kd=1.0, tau=0.0)

        # 2. 读取缓存状态(可能是上一次迭代的数据)
        state = motor.get_state()

        if state:
            # 使用状态做控制决策
            error = 0.5 - state.pos
            print(f"误差: {error:.4f}")

        # 3. 保持循环定时
        elapsed = time.time() - t0
        if elapsed < DT_S:
            time.sleep(DT_S - elapsed)

参数详解

参数说明推荐值
DT_S控制周期(秒)0.01 ~ 0.05
循环次数总迭代次数根据应用需求

何时使用

场景适用性
高频循环(>50Hz)✅ 推荐
状态新鲜度不关键✅ 推荐
需要最小延迟✅ 推荐
闭环精确控制⚠️ 考虑模式 B

模式 B:先请求后读取

最适合:需要最新状态的闭环控制。

完整代码

import time
from motorbridge import Controller, Mode

DT_S = 0.02

with Controller("can0") as ctrl:
    motor = ctrl.add_damiao_motor(0x01, 0x11, "4340P")

    ctrl.enable_all()
    motor.ensure_mode(Mode.MIT, 1000)

    for i in range(1000):
        t0 = time.time()

        # 1. 请求新鲜反馈
        motor.request_feedback()

        # 2. 短暂等待响应
        time.sleep(0.001)

        # 3. 读取状态(应该是新鲜的)
        state = motor.get_state()

        if state:
            print(f"位置: {state.pos:.3f}")

        # 4. 基于新鲜状态发送命令
        motor.send_mit(0.5, 0.0, 30.0, 1.0, 0.0)

        # 5. 保持定时
        elapsed = time.time() - t0
        if elapsed < DT_S:
            time.sleep(DT_S - elapsed)

时序图

时间线:
├── request_feedback()    # 请求反馈
├── sleep(0.001)          # 等待 CAN 响应
├── get_state()           # 读取新鲜数据
├── send_mit()            # 发送命令
└── sleep(剩余时间)        # 保持频率

何时使用

场景适用性
基于状态的控制决策✅ 推荐
闭环控制需要新鲜反馈✅ 推荐
较低频率循环(<50Hz)✅ 推荐
超高频控制(>100Hz)⚠️ 考虑模式 A

模式 C:多电机批量查询

最适合:多个电机的高效控制。

完整代码

import time
from motorbridge import Controller, Mode

DT_S = 0.02
MOTOR_NAMES = ["m1", "m2", "m3", "m4"]

with Controller("can0") as ctrl:
    # 创建电机字典
    motors = {
        name: ctrl.add_damiao_motor(i+1, 0x10+i+1, "4340P")
        for i, name in enumerate(MOTOR_NAMES)
    }

    ctrl.enable_all()
    for motor in motors.values():
        motor.ensure_mode(Mode.MIT, 1000)

    for i in range(1000):
        t0 = time.time()

        # 1. 先请求所有反馈
        for motor in motors.values():
            motor.request_feedback()

        # 2. 一次 poll 处理所有(高效)
        ctrl.poll_feedback_once()

        # 3. 读取所有状态
        states = {name: motor.get_state() for name, motor in motors.items()}

        # 4. 基于状态发送命令
        for name, motor in motors.items():
            state = states[name]
            if state:
                # 基于状态的控制
                target = 0.5 if state.pos < 0.5 else -0.5
            else:
                target = 0.0

            motor.send_mit(target, 0.0, 30.0, 1.0, 0.0)

        # 5. 保持定时
        elapsed = time.time() - t0
        if elapsed < DT_S:
            time.sleep(DT_S - elapsed)

关键点

# 正确顺序:
# 1. 先请求所有
for motor in motors.values():
    motor.request_feedback()

# 2. 一次 poll
ctrl.poll_feedback_once()

# 3. 再读取所有
for name, motor in motors.items():
    state = motor.get_state()

何时使用

场景适用性
多电机同总线✅ 推荐
协调控制✅ 推荐
高效 CAN 总线利用✅ 推荐

模式 D:状态机控制

最适合:复杂的控制序列。

完整代码

import time
from enum import Enum, auto
from motorbridge import Controller, Mode

class State(Enum):
    INIT = auto()
    HOMING = auto()
    RUNNING = auto()
    ERROR = auto()

DT_S = 0.02

with Controller("can0") as ctrl:
    motor = ctrl.add_damiao_motor(0x01, 0x11, "4340P")

    state = State.INIT
    homing_target = 0.0
    run_target = 1.0

    ctrl.enable_all()

    for i in range(1000):
        t0 = time.time()

        # 读取状态
        motor.request_feedback()
        motor_state = motor.get_state()

        # 状态机逻辑
        if state == State.INIT:
            print("初始化...")
            motor.ensure_mode(Mode.POS_VEL, 1000)
            state = State.HOMING

        elif state == State.HOMING:
            motor.send_pos_vel(homing_target, vlim=1.0)
            if motor_state and abs(motor_state.pos - homing_target) < 0.02:
                print("回零完成")
                state = State.RUNNING

        elif state == State.RUNNING:
            motor.send_pos_vel(run_target, vlim=1.5)
            if motor_state and abs(motor_state.pos - run_target) < 0.02:
                run_target = -run_target  # 反向

        elif state == State.ERROR:
            motor.disable()
            print("错误状态 - 停止")
            break

        # 安全检查
        if motor_state and motor_state.t_mos > 80:
            print(f"过热: {motor_state.t_mos:.1f}°C")
            state = State.ERROR

        # 保持定时
        elapsed = time.time() - t0
        if elapsed < DT_S:
            time.sleep(DT_S - elapsed)

状态转换图

INIT → HOMING → RUNNING ↔ (往复运动)
  ↓       ↓         ↓
ERROR ←────────────── (温度过高)

模式 E:优雅降级

最适合:需要容错能力的关键应用。

完整代码

import time
from motorbridge import Controller, Mode
from motorbridge.errors import CallError

DT_S = 0.02
MAX_RETRIES = 3

with Controller("can0") as ctrl:
    motor = ctrl.add_damiao_motor(0x01, 0x11, "4340P")

    ctrl.enable_all()
    motor.ensure_mode(Mode.MIT, 1000)

    consecutive_errors = 0

    for i in range(1000):
        t0 = time.time()

        try:
            # 尝试命令
            motor.send_mit(0.5, 0.0, 30.0, 1.0, 0.0)
            consecutive_errors = 0

            # 读取状态
            motor.request_feedback()
            state = motor.get_state()

            if state:
                print(f"位置={state.pos:.3f}")

        except CallError as e:
            consecutive_errors += 1
            print(f"错误 ({consecutive_errors}/{MAX_RETRIES}): {e}")

            if consecutive_errors >= MAX_RETRIES:
                print("错误过多 - 停止")
                break

            # 尝试恢复
            try:
                motor.clear_error()
                motor.enable()
            except:
                pass

        # 保持定时
        elapsed = time.time() - t0
        if elapsed < DT_S:
            time.sleep(DT_S - elapsed)

    motor.disable()

总线负载管理

避免 CAN 缓冲区溢出(OS Error 105)

如果看到 socketcan write failed: No buffer space available (os error 105)

1. 增加控制周期

# 太快了
DT_S = 0.005  # 200Hz - 可能溢出

# 更安全
DT_S = 0.02   # 50Hz - 通常稳定
DT_S = 0.03   # 33Hz - 保守

2. 增加 TX 队列长度

sudo ifconfig can0 txqueuelen 1000

3. 避免多个发送者

# 不好:多个程序同时发送
# 程序 A 和程序 B 都发送命令

# 好:每个 CAN 接口只有一个发送者

4. 减少不必要的反馈请求

# 不好:每次迭代都请求
for i in range(1000):
    motor.request_feedback()  # 每次都请求
    motor.send_mit(...)

# 好:降低请求频率
for i in range(1000):
    if i % 5 == 0:  # 每 5 次请求一次
        motor.request_feedback()
    motor.send_mit(...)

CAN 带宽估算

电机数频率估算带宽风险
150Hz~5%
450Hz~20%
850Hz~40%
4100Hz~40%
8100Hz~80%

定时最佳实践

测量和调整

import time

DT_S = 0.02  # 目标 50Hz

for i in range(100):
    t0 = time.time()

    # 你的控制代码
    # ...

    elapsed = time.time() - t0
    sleep_time = DT_S - elapsed

    if sleep_time > 0:
        time.sleep(sleep_time)
    else:
        print(f"警告: 超时 {-sleep_time*1000:.1f}ms")

    # 记录实际定时
    actual_dt = time.time() - t0
    if i % 10 == 0:
        print(f"实际周期: {actual_dt*1000:.1f}ms ({1/actual_dt:.1f}Hz)")

使用高精度定时器

import time

# 测量优先使用 time.perf_counter()
t0 = time.perf_counter()
# ... 工作 ...
elapsed = time.perf_counter() - t0

安全模式

看门狗模式

import time

LAST_COMMAND_TIME = time.time()
WATCHDOG_TIMEOUT = 0.5  # 秒

def send_command_safe(motor, pos):
    """带看门狗的安全命令发送"""
    global LAST_COMMAND_TIME

    if time.time() - LAST_COMMAND_TIME > WATCHDOG_TIMEOUT:
        print("看门狗超时 - 发送安全命令")
        motor.send_mit(0.0, 0.0, 10.0, 1.0, 0.0)  # 安全位置

    motor.send_mit(pos, 0.0, 30.0, 1.0, 0.0)
    LAST_COMMAND_TIME = time.time()

温度保护

TEMP_WARNING = 70.0   # 警告温度
TEMP_CRITICAL = 85.0  # 危险温度

def check_temperature(state):
    """检查温度是否安全"""
    if not state:
        return True  # 无状态无法检查

    if state.t_mos > TEMP_CRITICAL or state.t_rotor > TEMP_CRITICAL:
        return False  # 危险 - 必须停止

    if state.t_mos > TEMP_WARNING:
        print(f"警告: 高温 ({state.t_mos:.1f}°C)")

    return True  # OK 继续

下一步