最佳实践:控制循环模式
模式 A:周期控制 + 缓存状态
最适合:高频控制循环,每次迭代不需要最新状态。完整代码
import time
from motorbridge import Controller, Mode
# ============ 配置 ============
DT_S = 0.02 # 控制周期 20ms = 50Hz
# ==============================
with Controller("can0") as ctrl:
motor = ctrl.add_damiao_motor(0x01, 0x11, "4340P")
ctrl.enable_all()
motor.ensure_mode(Mode.MIT, 1000)
for i in range(1000):
t0 = time.time()
# 1. 发送控制命令
motor.send_mit(pos=0.5, vel=0.0, kp=30.0, kd=1.0, tau=0.0)
# 2. 读取缓存状态(可能是上一次迭代的数据)
state = motor.get_state()
if state:
# 使用状态做控制决策
error = 0.5 - state.pos
print(f"误差: {error:.4f}")
# 3. 保持循环定时
elapsed = time.time() - t0
if elapsed < DT_S:
time.sleep(DT_S - elapsed)
参数详解
| 参数 | 说明 | 推荐值 |
|---|---|---|
DT_S | 控制周期(秒) | 0.01 ~ 0.05 |
| 循环次数 | 总迭代次数 | 根据应用需求 |
何时使用
| 场景 | 适用性 |
|---|---|
| 高频循环(>50Hz) | ✅ 推荐 |
| 状态新鲜度不关键 | ✅ 推荐 |
| 需要最小延迟 | ✅ 推荐 |
| 闭环精确控制 | ⚠️ 考虑模式 B |
模式 B:先请求后读取
最适合:需要最新状态的闭环控制。完整代码
import time
from motorbridge import Controller, Mode
DT_S = 0.02
with Controller("can0") as ctrl:
motor = ctrl.add_damiao_motor(0x01, 0x11, "4340P")
ctrl.enable_all()
motor.ensure_mode(Mode.MIT, 1000)
for i in range(1000):
t0 = time.time()
# 1. 请求新鲜反馈
motor.request_feedback()
# 2. 短暂等待响应
time.sleep(0.001)
# 3. 读取状态(应该是新鲜的)
state = motor.get_state()
if state:
print(f"位置: {state.pos:.3f}")
# 4. 基于新鲜状态发送命令
motor.send_mit(0.5, 0.0, 30.0, 1.0, 0.0)
# 5. 保持定时
elapsed = time.time() - t0
if elapsed < DT_S:
time.sleep(DT_S - elapsed)
时序图
时间线:
├── request_feedback() # 请求反馈
├── sleep(0.001) # 等待 CAN 响应
├── get_state() # 读取新鲜数据
├── send_mit() # 发送命令
└── sleep(剩余时间) # 保持频率
何时使用
| 场景 | 适用性 |
|---|---|
| 基于状态的控制决策 | ✅ 推荐 |
| 闭环控制需要新鲜反馈 | ✅ 推荐 |
| 较低频率循环(<50Hz) | ✅ 推荐 |
| 超高频控制(>100Hz) | ⚠️ 考虑模式 A |
模式 C:多电机批量查询
最适合:多个电机的高效控制。完整代码
import time
from motorbridge import Controller, Mode
DT_S = 0.02
MOTOR_NAMES = ["m1", "m2", "m3", "m4"]
with Controller("can0") as ctrl:
# 创建电机字典
motors = {
name: ctrl.add_damiao_motor(i+1, 0x10+i+1, "4340P")
for i, name in enumerate(MOTOR_NAMES)
}
ctrl.enable_all()
for motor in motors.values():
motor.ensure_mode(Mode.MIT, 1000)
for i in range(1000):
t0 = time.time()
# 1. 先请求所有反馈
for motor in motors.values():
motor.request_feedback()
# 2. 一次 poll 处理所有(高效)
ctrl.poll_feedback_once()
# 3. 读取所有状态
states = {name: motor.get_state() for name, motor in motors.items()}
# 4. 基于状态发送命令
for name, motor in motors.items():
state = states[name]
if state:
# 基于状态的控制
target = 0.5 if state.pos < 0.5 else -0.5
else:
target = 0.0
motor.send_mit(target, 0.0, 30.0, 1.0, 0.0)
# 5. 保持定时
elapsed = time.time() - t0
if elapsed < DT_S:
time.sleep(DT_S - elapsed)
关键点
# 正确顺序:
# 1. 先请求所有
for motor in motors.values():
motor.request_feedback()
# 2. 一次 poll
ctrl.poll_feedback_once()
# 3. 再读取所有
for name, motor in motors.items():
state = motor.get_state()
何时使用
| 场景 | 适用性 |
|---|---|
| 多电机同总线 | ✅ 推荐 |
| 协调控制 | ✅ 推荐 |
| 高效 CAN 总线利用 | ✅ 推荐 |
模式 D:状态机控制
最适合:复杂的控制序列。完整代码
import time
from enum import Enum, auto
from motorbridge import Controller, Mode
class State(Enum):
INIT = auto()
HOMING = auto()
RUNNING = auto()
ERROR = auto()
DT_S = 0.02
with Controller("can0") as ctrl:
motor = ctrl.add_damiao_motor(0x01, 0x11, "4340P")
state = State.INIT
homing_target = 0.0
run_target = 1.0
ctrl.enable_all()
for i in range(1000):
t0 = time.time()
# 读取状态
motor.request_feedback()
motor_state = motor.get_state()
# 状态机逻辑
if state == State.INIT:
print("初始化...")
motor.ensure_mode(Mode.POS_VEL, 1000)
state = State.HOMING
elif state == State.HOMING:
motor.send_pos_vel(homing_target, vlim=1.0)
if motor_state and abs(motor_state.pos - homing_target) < 0.02:
print("回零完成")
state = State.RUNNING
elif state == State.RUNNING:
motor.send_pos_vel(run_target, vlim=1.5)
if motor_state and abs(motor_state.pos - run_target) < 0.02:
run_target = -run_target # 反向
elif state == State.ERROR:
motor.disable()
print("错误状态 - 停止")
break
# 安全检查
if motor_state and motor_state.t_mos > 80:
print(f"过热: {motor_state.t_mos:.1f}°C")
state = State.ERROR
# 保持定时
elapsed = time.time() - t0
if elapsed < DT_S:
time.sleep(DT_S - elapsed)
状态转换图
INIT → HOMING → RUNNING ↔ (往复运动)
↓ ↓ ↓
ERROR ←────────────── (温度过高)
模式 E:优雅降级
最适合:需要容错能力的关键应用。完整代码
import time
from motorbridge import Controller, Mode
from motorbridge.errors import CallError
DT_S = 0.02
MAX_RETRIES = 3
with Controller("can0") as ctrl:
motor = ctrl.add_damiao_motor(0x01, 0x11, "4340P")
ctrl.enable_all()
motor.ensure_mode(Mode.MIT, 1000)
consecutive_errors = 0
for i in range(1000):
t0 = time.time()
try:
# 尝试命令
motor.send_mit(0.5, 0.0, 30.0, 1.0, 0.0)
consecutive_errors = 0
# 读取状态
motor.request_feedback()
state = motor.get_state()
if state:
print(f"位置={state.pos:.3f}")
except CallError as e:
consecutive_errors += 1
print(f"错误 ({consecutive_errors}/{MAX_RETRIES}): {e}")
if consecutive_errors >= MAX_RETRIES:
print("错误过多 - 停止")
break
# 尝试恢复
try:
motor.clear_error()
motor.enable()
except:
pass
# 保持定时
elapsed = time.time() - t0
if elapsed < DT_S:
time.sleep(DT_S - elapsed)
motor.disable()
总线负载管理
避免 CAN 缓冲区溢出(OS Error 105)
如果看到socketcan write failed: No buffer space available (os error 105):
1. 增加控制周期
# 太快了
DT_S = 0.005 # 200Hz - 可能溢出
# 更安全
DT_S = 0.02 # 50Hz - 通常稳定
DT_S = 0.03 # 33Hz - 保守
2. 增加 TX 队列长度
sudo ifconfig can0 txqueuelen 1000
3. 避免多个发送者
# 不好:多个程序同时发送
# 程序 A 和程序 B 都发送命令
# 好:每个 CAN 接口只有一个发送者
4. 减少不必要的反馈请求
# 不好:每次迭代都请求
for i in range(1000):
motor.request_feedback() # 每次都请求
motor.send_mit(...)
# 好:降低请求频率
for i in range(1000):
if i % 5 == 0: # 每 5 次请求一次
motor.request_feedback()
motor.send_mit(...)
CAN 带宽估算
| 电机数 | 频率 | 估算带宽 | 风险 |
|---|---|---|---|
| 1 | 50Hz | ~5% | 低 |
| 4 | 50Hz | ~20% | 中 |
| 8 | 50Hz | ~40% | 中 |
| 4 | 100Hz | ~40% | 中 |
| 8 | 100Hz | ~80% | 高 |
定时最佳实践
测量和调整
import time
DT_S = 0.02 # 目标 50Hz
for i in range(100):
t0 = time.time()
# 你的控制代码
# ...
elapsed = time.time() - t0
sleep_time = DT_S - elapsed
if sleep_time > 0:
time.sleep(sleep_time)
else:
print(f"警告: 超时 {-sleep_time*1000:.1f}ms")
# 记录实际定时
actual_dt = time.time() - t0
if i % 10 == 0:
print(f"实际周期: {actual_dt*1000:.1f}ms ({1/actual_dt:.1f}Hz)")
使用高精度定时器
import time
# 测量优先使用 time.perf_counter()
t0 = time.perf_counter()
# ... 工作 ...
elapsed = time.perf_counter() - t0
安全模式
看门狗模式
import time
LAST_COMMAND_TIME = time.time()
WATCHDOG_TIMEOUT = 0.5 # 秒
def send_command_safe(motor, pos):
"""带看门狗的安全命令发送"""
global LAST_COMMAND_TIME
if time.time() - LAST_COMMAND_TIME > WATCHDOG_TIMEOUT:
print("看门狗超时 - 发送安全命令")
motor.send_mit(0.0, 0.0, 10.0, 1.0, 0.0) # 安全位置
motor.send_mit(pos, 0.0, 30.0, 1.0, 0.0)
LAST_COMMAND_TIME = time.time()
温度保护
TEMP_WARNING = 70.0 # 警告温度
TEMP_CRITICAL = 85.0 # 危险温度
def check_temperature(state):
"""检查温度是否安全"""
if not state:
return True # 无状态无法检查
if state.t_mos > TEMP_CRITICAL or state.t_rotor > TEMP_CRITICAL:
return False # 危险 - 必须停止
if state.t_mos > TEMP_WARNING:
print(f"警告: 高温 ({state.t_mos:.1f}°C)")
return True # OK 继续
下一步
- 故障排除 - 常见问题和解决方案
- 教程 06:实用配方 - 复制粘贴代码
- 版本管理 - 版本兼容性