> ## Documentation Index
> Fetch the complete documentation index at: https://motorbridge.seeedstudio.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Control loop patterns

# Best Practices: Control Loop Patterns

<mintly-toc>
  This guide covers recommended patterns for reliable motor control loops in production environments.
</mintly-toc>

## Pattern A: Periodic Control + Cached State

Best for high-frequency control loops where fresh state is not critical every iteration.

```python theme={null}
import time
from motorbridge import Controller, Mode

DT_S = 0.02  # 50Hz

with Controller("can0") as ctrl:
    motor = ctrl.add_damiao_motor(0x01, 0x11, "4340P")
    ctrl.enable_all()
    motor.ensure_mode(Mode.MIT, 1000)

    for i in range(1000):
        t0 = time.time()

        # Send control command
        motor.send_mit(pos=0.5, vel=0.0, kp=30.0, kd=1.0, tau=0.0)

        # Read cached state (may be from previous iteration)
        state = motor.get_state()

        if state:
            # Use state for control decisions
            error = 0.5 - state.pos
            print(f"Error: {error:.4f}")

        # Maintain loop timing
        elapsed = time.time() - t0
        if elapsed < DT_S:
            time.sleep(DT_S - elapsed)
```

**When to use:**

* High-frequency loops (>50Hz)
* State freshness is not critical
* Minimal latency required

## Pattern B: Request-Then-Read

Forces fresh feedback before reading state.

```python theme={null}
import time
from motorbridge import Controller, Mode

DT_S = 0.02

with Controller("can0") as ctrl:
    motor = ctrl.add_damiao_motor(0x01, 0x11, "4340P")
    ctrl.enable_all()
    motor.ensure_mode(Mode.MIT, 1000)

    for i in range(1000):
        t0 = time.time()

        # Request fresh feedback
        motor.request_feedback()

        # Small delay for response
        time.sleep(0.001)

        # Read state (should be fresh)
        state = motor.get_state()

        if state:
            print(f"Position: {state.pos:.3f}")

        # Send command based on fresh state
        motor.send_mit(0.5, 0.0, 30.0, 1.0, 0.0)

        # Maintain timing
        elapsed = time.time() - t0
        if elapsed < DT_S:
            time.sleep(DT_S - elapsed)
```

**When to use:**

* State-based control decisions
* Closed-loop control requiring fresh feedback
* Lower frequency loops (\<50Hz)

## Pattern C: Multi-Motor Batch Query

Efficient pattern for multiple motors.

```python theme={null}
import time
from motorbridge import Controller, Mode

DT_S = 0.02
MOTORS = ["m1", "m2", "m3", "m4"]

with Controller("can0") as ctrl:
    motors = {
        name: ctrl.add_damiao_motor(i+1, 0x10+i+1, "4340P")
        for i, name in enumerate(MOTORS)
    }

    ctrl.enable_all()
    for motor in motors.values():
        motor.ensure_mode(Mode.MIT, 1000)

    for i in range(1000):
        t0 = time.time()

        # Request all feedbacks first
        for motor in motors.values():
            motor.request_feedback()

        # Single poll for all (efficient)
        ctrl.poll_feedback_once()

        # Read all states
        states = {name: motor.get_state() for name, motor in motors.items()}

        # Send commands based on states
        for name, motor in motors.items():
            state = states[name]
            if state:
                # State-based control
                target = 0.5 if state.pos < 0.5 else -0.5
            else:
                target = 0.0

            motor.send_mit(target, 0.0, 30.0, 1.0, 0.0)

        # Maintain timing
        elapsed = time.time() - t0
        if elapsed < DT_S:
            time.sleep(DT_S - elapsed)
```

**When to use:**

* Multiple motors on same bus
* Coordinated control
* Efficient CAN bus utilization

## Pattern D: State Machine Control

For complex control sequences.

```python theme={null}
import time
from enum import Enum, auto
from motorbridge import Controller, Mode

class State(Enum):
    INIT = auto()
    HOMING = auto()
    RUNNING = auto()
    ERROR = auto()

DT_S = 0.02

with Controller("can0") as ctrl:
    motor = ctrl.add_damiao_motor(0x01, 0x11, "4340P")

    state = State.INIT
    homing_target = 0.0
    run_target = 1.0

    ctrl.enable_all()

    for i in range(1000):
        t0 = time.time()

        # Read state
        motor.request_feedback()
        motor_state = motor.get_state()

        # State machine
        if state == State.INIT:
            print("Initializing...")
            motor.ensure_mode(Mode.POS_VEL, 1000)
            state = State.HOMING

        elif state == State.HOMING:
            motor.send_pos_vel(homing_target, vlim=1.0)
            if motor_state and abs(motor_state.pos - homing_target) < 0.02:
                print("Homing complete")
                state = State.RUNNING

        elif state == State.RUNNING:
            motor.send_pos_vel(run_target, vlim=1.5)
            if motor_state and abs(motor_state.pos - run_target) < 0.02:
                run_target = -run_target  # Reverse

        elif state == State.ERROR:
            motor.disable()
            print("Error state - stopping")
            break

        # Safety check
        if motor_state and motor_state.t_mos > 80:
            print(f"Overheat: {motor_state.t_mos:.1f}°C")
            state = State.ERROR

        # Maintain timing
        elapsed = time.time() - t0
        if elapsed < DT_S:
            time.sleep(DT_S - elapsed)
```

## Pattern E: Graceful Degradation

Continue operation despite errors when safe.

```python theme={null}
import time
from motorbridge import Controller, Mode
from motorbridge.errors import CallError

DT_S = 0.02
MAX_RETRIES = 3

with Controller("can0") as ctrl:
    motor = ctrl.add_damiao_motor(0x01, 0x11, "4340P")
    ctrl.enable_all()
    motor.ensure_mode(Mode.MIT, 1000)

    consecutive_errors = 0

    for i in range(1000):
        t0 = time.time()

        try:
            # Attempt command
            motor.send_mit(0.5, 0.0, 30.0, 1.0, 0.0)
            consecutive_errors = 0

            # Read state
            motor.request_feedback()
            state = motor.get_state()

            if state:
                print(f"pos={state.pos:.3f}")

        except CallError as e:
            consecutive_errors += 1
            print(f"Error ({consecutive_errors}/{MAX_RETRIES}): {e}")

            if consecutive_errors >= MAX_RETRIES:
                print("Too many errors - stopping")
                break

            # Try to recover
            try:
                motor.clear_error()
                motor.enable()
            except:
                pass

        # Maintain timing
        elapsed = time.time() - t0
        if elapsed < DT_S:
            time.sleep(DT_S - elapsed)

    motor.disable()
```

## Bus Load Management

### Avoiding CAN Buffer Overflow (OS Error 105)

If you see `socketcan write failed: No buffer space available (os error 105)`:

**1. Increase control period:**

```python theme={null}
# Too fast for bus
DT_S = 0.005  # 200Hz - may cause overflow

# Safer
DT_S = 0.02   # 50Hz - usually stable
DT_S = 0.03   # 33Hz - conservative
```

**2. Increase TX queue length:**

```bash theme={null}
sudo ifconfig can0 txqueuelen 1000
```

**3. Avoid multiple senders:**

```python theme={null}
# Bad: Multiple programs sending to same CAN
# Program A and Program B both send commands

# Good: Single sender per CAN interface
```

**4. Reduce unnecessary feedback requests:**

```python theme={null}
# Bad: Request every iteration
for i in range(1000):
    motor.request_feedback()  # Every iteration
    motor.send_mit(...)

# Good: Request at lower rate
for i in range(1000):
    if i % 5 == 0:  # Every 5th iteration
        motor.request_feedback()
    motor.send_mit(...)
```

### CAN Bandwidth Estimation

| Motors | Rate  | Est. Bandwidth | Risk   |
| ------ | ----- | -------------- | ------ |
| 1      | 50Hz  | \~5%           | Low    |
| 4      | 50Hz  | \~20%          | Medium |
| 8      | 50Hz  | \~40%          | Medium |
| 4      | 100Hz | \~40%          | Medium |
| 8      | 100Hz | \~80%          | High   |

## Timing Best Practices

### Measure and Adapt

```python theme={null}
import time

DT_S = 0.02  # Target 50Hz

for i in range(100):
    t0 = time.time()

    # Your control code here
    # ...

    elapsed = time.time() - t0
    sleep_time = DT_S - elapsed

    if sleep_time > 0:
        time.sleep(sleep_time)
    else:
        print(f"WARNING: Overrun by {-sleep_time*1000:.1f}ms")

    # Log actual timing
    actual_dt = time.time() - t0
    if i % 10 == 0:
        print(f"Actual period: {actual_dt*1000:.1f}ms ({1/actual_dt:.1f}Hz)")
```

### Use High-Resolution Timer

```python theme={null}
import time

# Prefer time.perf_counter() for timing measurements
t0 = time.perf_counter()
# ... work ...
elapsed = time.perf_counter() - t0
```

## Safety Patterns

### Watchdog Pattern

```python theme={null}
import time

LAST_COMMAND_TIME = time.time()
WATCHDOG_TIMEOUT = 0.5  # seconds

def send_command_safe(motor, pos):
    global LAST_COMMAND_TIME

    if time.time() - LAST_COMMAND_TIME > WATCHDOG_TIMEOUT:
        print("Watchdog timeout - sending safe command")
        motor.send_mit(0.0, 0.0, 10.0, 1.0, 0.0)  # Safe position

    motor.send_mit(pos, 0.0, 30.0, 1.0, 0.0)
    LAST_COMMAND_TIME = time.time()
```

### Temperature Protection

```python theme={null}
TEMP_WARNING = 70.0
TEMP_CRITICAL = 85.0

def check_temperature(state):
    if not state:
        return True  # Can't check without state

    if state.t_mos > TEMP_CRITICAL or state.t_rotor > TEMP_CRITICAL:
        return False  # Critical - must stop

    if state.t_mos > TEMP_WARNING:
        print(f"WARNING: High temperature ({state.t_mos:.1f}°C)")

    return True  # OK to continue
```

## Next Steps

* [Troubleshooting](best-practices/troubleshooting) - Common issues and solutions
* [Tutorial 06: Practical Recipes](tutorials/06-practical-recipes) - Copy-paste code
* [Release and Versioning](best-practices/release-and-versioning) - Version management
