> ## Documentation Index
> Fetch the complete documentation index at: https://motorbridge.seeedstudio.com/llms.txt
> Use this file to discover all available pages before exploring further.

# 控制循环模式

<mintly-toc>
  本指南介绍生产环境中可靠电机控制循环的推荐模式，包括定时控制、错误处理和总线负载管理。
</mintly-toc>

## 模式 A：周期控制 + 缓存状态

**最适合：高频控制循环，每次迭代不需要最新状态。**

### 完整代码

```python theme={null}
import time
from motorbridge import Controller, Mode

# ============ 配置 ============
DT_S = 0.02  # 控制周期 20ms = 50Hz
# ==============================

with Controller("can0") as ctrl:
    motor = ctrl.add_damiao_motor(0x01, 0x11, "4340P")

    ctrl.enable_all()
    motor.ensure_mode(Mode.MIT, 1000)

    for i in range(1000):
        t0 = time.time()

        # 1. 发送控制命令
        motor.send_mit(pos=0.5, vel=0.0, kp=30.0, kd=1.0, tau=0.0)

        # 2. 读取缓存状态（可能是上一次迭代的数据）
        state = motor.get_state()

        if state:
            # 使用状态做控制决策
            error = 0.5 - state.pos
            print(f"误差: {error:.4f}")

        # 3. 保持循环定时
        elapsed = time.time() - t0
        if elapsed < DT_S:
            time.sleep(DT_S - elapsed)
```

### 参数详解

| 参数     | 说明      | 推荐值          |
| ------ | ------- | ------------ |
| `DT_S` | 控制周期（秒） | 0.01 \~ 0.05 |
| 循环次数   | 总迭代次数   | 根据应用需求       |

### 何时使用

| 场景          | 适用性       |
| ----------- | --------- |
| 高频循环（>50Hz） | ✅ 推荐      |
| 状态新鲜度不关键    | ✅ 推荐      |
| 需要最小延迟      | ✅ 推荐      |
| 闭环精确控制      | ⚠️ 考虑模式 B |

***

## 模式 B：先请求后读取

**最适合：需要最新状态的闭环控制。**

### 完整代码

```python theme={null}
import time
from motorbridge import Controller, Mode

DT_S = 0.02

with Controller("can0") as ctrl:
    motor = ctrl.add_damiao_motor(0x01, 0x11, "4340P")

    ctrl.enable_all()
    motor.ensure_mode(Mode.MIT, 1000)

    for i in range(1000):
        t0 = time.time()

        # 1. 请求新鲜反馈
        motor.request_feedback()

        # 2. 短暂等待响应
        time.sleep(0.001)

        # 3. 读取状态（应该是新鲜的）
        state = motor.get_state()

        if state:
            print(f"位置: {state.pos:.3f}")

        # 4. 基于新鲜状态发送命令
        motor.send_mit(0.5, 0.0, 30.0, 1.0, 0.0)

        # 5. 保持定时
        elapsed = time.time() - t0
        if elapsed < DT_S:
            time.sleep(DT_S - elapsed)
```

### 时序图

```
时间线:
├── request_feedback()    # 请求反馈
├── sleep(0.001)          # 等待 CAN 响应
├── get_state()           # 读取新鲜数据
├── send_mit()            # 发送命令
└── sleep(剩余时间)        # 保持频率
```

### 何时使用

| 场景             | 适用性       |
| -------------- | --------- |
| 基于状态的控制决策      | ✅ 推荐      |
| 闭环控制需要新鲜反馈     | ✅ 推荐      |
| 较低频率循环（\<50Hz） | ✅ 推荐      |
| 超高频控制（>100Hz）  | ⚠️ 考虑模式 A |

***

## 模式 C：多电机批量查询

**最适合：多个电机的高效控制。**

### 完整代码

```python theme={null}
import time
from motorbridge import Controller, Mode

DT_S = 0.02
MOTOR_NAMES = ["m1", "m2", "m3", "m4"]

with Controller("can0") as ctrl:
    # 创建电机字典
    motors = {
        name: ctrl.add_damiao_motor(i+1, 0x10+i+1, "4340P")
        for i, name in enumerate(MOTOR_NAMES)
    }

    ctrl.enable_all()
    for motor in motors.values():
        motor.ensure_mode(Mode.MIT, 1000)

    for i in range(1000):
        t0 = time.time()

        # 1. 先请求所有反馈
        for motor in motors.values():
            motor.request_feedback()

        # 2. 一次 poll 处理所有（高效）
        ctrl.poll_feedback_once()

        # 3. 读取所有状态
        states = {name: motor.get_state() for name, motor in motors.items()}

        # 4. 基于状态发送命令
        for name, motor in motors.items():
            state = states[name]
            if state:
                # 基于状态的控制
                target = 0.5 if state.pos < 0.5 else -0.5
            else:
                target = 0.0

            motor.send_mit(target, 0.0, 30.0, 1.0, 0.0)

        # 5. 保持定时
        elapsed = time.time() - t0
        if elapsed < DT_S:
            time.sleep(DT_S - elapsed)
```

### 关键点

```python theme={null}
# 正确顺序：
# 1. 先请求所有
for motor in motors.values():
    motor.request_feedback()

# 2. 一次 poll
ctrl.poll_feedback_once()

# 3. 再读取所有
for name, motor in motors.items():
    state = motor.get_state()
```

### 何时使用

| 场景          | 适用性  |
| ----------- | ---- |
| 多电机同总线      | ✅ 推荐 |
| 协调控制        | ✅ 推荐 |
| 高效 CAN 总线利用 | ✅ 推荐 |

***

## 模式 D：状态机控制

**最适合：复杂的控制序列。**

### 完整代码

```python theme={null}
import time
from enum import Enum, auto
from motorbridge import Controller, Mode

class State(Enum):
    INIT = auto()
    HOMING = auto()
    RUNNING = auto()
    ERROR = auto()

DT_S = 0.02

with Controller("can0") as ctrl:
    motor = ctrl.add_damiao_motor(0x01, 0x11, "4340P")

    state = State.INIT
    homing_target = 0.0
    run_target = 1.0

    ctrl.enable_all()

    for i in range(1000):
        t0 = time.time()

        # 读取状态
        motor.request_feedback()
        motor_state = motor.get_state()

        # 状态机逻辑
        if state == State.INIT:
            print("初始化...")
            motor.ensure_mode(Mode.POS_VEL, 1000)
            state = State.HOMING

        elif state == State.HOMING:
            motor.send_pos_vel(homing_target, vlim=1.0)
            if motor_state and abs(motor_state.pos - homing_target) < 0.02:
                print("回零完成")
                state = State.RUNNING

        elif state == State.RUNNING:
            motor.send_pos_vel(run_target, vlim=1.5)
            if motor_state and abs(motor_state.pos - run_target) < 0.02:
                run_target = -run_target  # 反向

        elif state == State.ERROR:
            motor.disable()
            print("错误状态 - 停止")
            break

        # 安全检查
        if motor_state and motor_state.t_mos > 80:
            print(f"过热: {motor_state.t_mos:.1f}°C")
            state = State.ERROR

        # 保持定时
        elapsed = time.time() - t0
        if elapsed < DT_S:
            time.sleep(DT_S - elapsed)
```

### 状态转换图

```
INIT → HOMING → RUNNING ↔ (往复运动)
  ↓       ↓         ↓
ERROR ←────────────── (温度过高)
```

***

## 模式 E：优雅降级

**最适合：需要容错能力的关键应用。**

### 完整代码

```python theme={null}
import time
from motorbridge import Controller, Mode
from motorbridge.errors import CallError

DT_S = 0.02
MAX_RETRIES = 3

with Controller("can0") as ctrl:
    motor = ctrl.add_damiao_motor(0x01, 0x11, "4340P")

    ctrl.enable_all()
    motor.ensure_mode(Mode.MIT, 1000)

    consecutive_errors = 0

    for i in range(1000):
        t0 = time.time()

        try:
            # 尝试命令
            motor.send_mit(0.5, 0.0, 30.0, 1.0, 0.0)
            consecutive_errors = 0

            # 读取状态
            motor.request_feedback()
            state = motor.get_state()

            if state:
                print(f"位置={state.pos:.3f}")

        except CallError as e:
            consecutive_errors += 1
            print(f"错误 ({consecutive_errors}/{MAX_RETRIES}): {e}")

            if consecutive_errors >= MAX_RETRIES:
                print("错误过多 - 停止")
                break

            # 尝试恢复
            try:
                motor.clear_error()
                motor.enable()
            except:
                pass

        # 保持定时
        elapsed = time.time() - t0
        if elapsed < DT_S:
            time.sleep(DT_S - elapsed)

    motor.disable()
```

***

## 总线负载管理

### 避免 CAN 缓冲区溢出（OS Error 105）

如果看到 `socketcan write failed: No buffer space available (os error 105)`：

#### 1. 增加控制周期

```python theme={null}
# 太快了
DT_S = 0.005  # 200Hz - 可能溢出

# 更安全
DT_S = 0.02   # 50Hz - 通常稳定
DT_S = 0.03   # 33Hz - 保守
```

#### 2. 增加 TX 队列长度

```bash theme={null}
sudo ifconfig can0 txqueuelen 1000
```

#### 3. 避免多个发送者

```python theme={null}
# 不好：多个程序同时发送
# 程序 A 和程序 B 都发送命令

# 好：每个 CAN 接口只有一个发送者
```

#### 4. 减少不必要的反馈请求

```python theme={null}
# 不好：每次迭代都请求
for i in range(1000):
    motor.request_feedback()  # 每次都请求
    motor.send_mit(...)

# 好：降低请求频率
for i in range(1000):
    if i % 5 == 0:  # 每 5 次请求一次
        motor.request_feedback()
    motor.send_mit(...)
```

### CAN 带宽估算

| 电机数 | 频率    | 估算带宽  | 风险 |
| --- | ----- | ----- | -- |
| 1   | 50Hz  | \~5%  | 低  |
| 4   | 50Hz  | \~20% | 中  |
| 8   | 50Hz  | \~40% | 中  |
| 4   | 100Hz | \~40% | 中  |
| 8   | 100Hz | \~80% | 高  |

***

## 定时最佳实践

### 测量和调整

```python theme={null}
import time

DT_S = 0.02  # 目标 50Hz

for i in range(100):
    t0 = time.time()

    # 你的控制代码
    # ...

    elapsed = time.time() - t0
    sleep_time = DT_S - elapsed

    if sleep_time > 0:
        time.sleep(sleep_time)
    else:
        print(f"警告: 超时 {-sleep_time*1000:.1f}ms")

    # 记录实际定时
    actual_dt = time.time() - t0
    if i % 10 == 0:
        print(f"实际周期: {actual_dt*1000:.1f}ms ({1/actual_dt:.1f}Hz)")
```

### 使用高精度定时器

```python theme={null}
import time

# 测量优先使用 time.perf_counter()
t0 = time.perf_counter()
# ... 工作 ...
elapsed = time.perf_counter() - t0
```

***

## 安全模式

### 看门狗模式

```python theme={null}
import time

LAST_COMMAND_TIME = time.time()
WATCHDOG_TIMEOUT = 0.5  # 秒

def send_command_safe(motor, pos):
    """带看门狗的安全命令发送"""
    global LAST_COMMAND_TIME

    if time.time() - LAST_COMMAND_TIME > WATCHDOG_TIMEOUT:
        print("看门狗超时 - 发送安全命令")
        motor.send_mit(0.0, 0.0, 10.0, 1.0, 0.0)  # 安全位置

    motor.send_mit(pos, 0.0, 30.0, 1.0, 0.0)
    LAST_COMMAND_TIME = time.time()
```

### 温度保护

```python theme={null}
TEMP_WARNING = 70.0   # 警告温度
TEMP_CRITICAL = 85.0  # 危险温度

def check_temperature(state):
    """检查温度是否安全"""
    if not state:
        return True  # 无状态无法检查

    if state.t_mos > TEMP_CRITICAL or state.t_rotor > TEMP_CRITICAL:
        return False  # 危险 - 必须停止

    if state.t_mos > TEMP_WARNING:
        print(f"警告: 高温 ({state.t_mos:.1f}°C)")

    return True  # OK 继续
```

***

## 下一步

* [故障排除](best-practices/troubleshooting) - 常见问题和解决方案
* [教程 06：实用配方](tutorials/06-practical-recipes) - 复制粘贴代码
* [版本管理](best-practices/release-and-versioning) - 版本兼容性
