Skip to main content

Best Practices: Control Loop Patterns

Pattern A: Periodic Control + Cached State

Best for high-frequency control loops where fresh state is not critical every iteration.
import time
from motorbridge import Controller, Mode

DT_S = 0.02  # 50Hz

with Controller("can0") as ctrl:
    motor = ctrl.add_damiao_motor(0x01, 0x11, "4340P")
    ctrl.enable_all()
    motor.ensure_mode(Mode.MIT, 1000)

    for i in range(1000):
        t0 = time.time()

        # Send control command
        motor.send_mit(pos=0.5, vel=0.0, kp=30.0, kd=1.0, tau=0.0)

        # Read cached state (may be from previous iteration)
        state = motor.get_state()

        if state:
            # Use state for control decisions
            error = 0.5 - state.pos
            print(f"Error: {error:.4f}")

        # Maintain loop timing
        elapsed = time.time() - t0
        if elapsed < DT_S:
            time.sleep(DT_S - elapsed)
When to use:
  • High-frequency loops (>50Hz)
  • State freshness is not critical
  • Minimal latency required

Pattern B: Request-Then-Read

Forces fresh feedback before reading state.
import time
from motorbridge import Controller, Mode

DT_S = 0.02

with Controller("can0") as ctrl:
    motor = ctrl.add_damiao_motor(0x01, 0x11, "4340P")
    ctrl.enable_all()
    motor.ensure_mode(Mode.MIT, 1000)

    for i in range(1000):
        t0 = time.time()

        # Request fresh feedback
        motor.request_feedback()

        # Small delay for response
        time.sleep(0.001)

        # Read state (should be fresh)
        state = motor.get_state()

        if state:
            print(f"Position: {state.pos:.3f}")

        # Send command based on fresh state
        motor.send_mit(0.5, 0.0, 30.0, 1.0, 0.0)

        # Maintain timing
        elapsed = time.time() - t0
        if elapsed < DT_S:
            time.sleep(DT_S - elapsed)
When to use:
  • State-based control decisions
  • Closed-loop control requiring fresh feedback
  • Lower frequency loops (<50Hz)

Pattern C: Multi-Motor Batch Query

Efficient pattern for multiple motors.
import time
from motorbridge import Controller, Mode

DT_S = 0.02
MOTORS = ["m1", "m2", "m3", "m4"]

with Controller("can0") as ctrl:
    motors = {
        name: ctrl.add_damiao_motor(i+1, 0x10+i+1, "4340P")
        for i, name in enumerate(MOTORS)
    }

    ctrl.enable_all()
    for motor in motors.values():
        motor.ensure_mode(Mode.MIT, 1000)

    for i in range(1000):
        t0 = time.time()

        # Request all feedbacks first
        for motor in motors.values():
            motor.request_feedback()

        # Single poll for all (efficient)
        ctrl.poll_feedback_once()

        # Read all states
        states = {name: motor.get_state() for name, motor in motors.items()}

        # Send commands based on states
        for name, motor in motors.items():
            state = states[name]
            if state:
                # State-based control
                target = 0.5 if state.pos < 0.5 else -0.5
            else:
                target = 0.0

            motor.send_mit(target, 0.0, 30.0, 1.0, 0.0)

        # Maintain timing
        elapsed = time.time() - t0
        if elapsed < DT_S:
            time.sleep(DT_S - elapsed)
When to use:
  • Multiple motors on same bus
  • Coordinated control
  • Efficient CAN bus utilization

Pattern D: State Machine Control

For complex control sequences.
import time
from enum import Enum, auto
from motorbridge import Controller, Mode

class State(Enum):
    INIT = auto()
    HOMING = auto()
    RUNNING = auto()
    ERROR = auto()

DT_S = 0.02

with Controller("can0") as ctrl:
    motor = ctrl.add_damiao_motor(0x01, 0x11, "4340P")

    state = State.INIT
    homing_target = 0.0
    run_target = 1.0

    ctrl.enable_all()

    for i in range(1000):
        t0 = time.time()

        # Read state
        motor.request_feedback()
        motor_state = motor.get_state()

        # State machine
        if state == State.INIT:
            print("Initializing...")
            motor.ensure_mode(Mode.POS_VEL, 1000)
            state = State.HOMING

        elif state == State.HOMING:
            motor.send_pos_vel(homing_target, vlim=1.0)
            if motor_state and abs(motor_state.pos - homing_target) < 0.02:
                print("Homing complete")
                state = State.RUNNING

        elif state == State.RUNNING:
            motor.send_pos_vel(run_target, vlim=1.5)
            if motor_state and abs(motor_state.pos - run_target) < 0.02:
                run_target = -run_target  # Reverse

        elif state == State.ERROR:
            motor.disable()
            print("Error state - stopping")
            break

        # Safety check
        if motor_state and motor_state.t_mos > 80:
            print(f"Overheat: {motor_state.t_mos:.1f}°C")
            state = State.ERROR

        # Maintain timing
        elapsed = time.time() - t0
        if elapsed < DT_S:
            time.sleep(DT_S - elapsed)

Pattern E: Graceful Degradation

Continue operation despite errors when safe.
import time
from motorbridge import Controller, Mode
from motorbridge.errors import CallError

DT_S = 0.02
MAX_RETRIES = 3

with Controller("can0") as ctrl:
    motor = ctrl.add_damiao_motor(0x01, 0x11, "4340P")
    ctrl.enable_all()
    motor.ensure_mode(Mode.MIT, 1000)

    consecutive_errors = 0

    for i in range(1000):
        t0 = time.time()

        try:
            # Attempt command
            motor.send_mit(0.5, 0.0, 30.0, 1.0, 0.0)
            consecutive_errors = 0

            # Read state
            motor.request_feedback()
            state = motor.get_state()

            if state:
                print(f"pos={state.pos:.3f}")

        except CallError as e:
            consecutive_errors += 1
            print(f"Error ({consecutive_errors}/{MAX_RETRIES}): {e}")

            if consecutive_errors >= MAX_RETRIES:
                print("Too many errors - stopping")
                break

            # Try to recover
            try:
                motor.clear_error()
                motor.enable()
            except:
                pass

        # Maintain timing
        elapsed = time.time() - t0
        if elapsed < DT_S:
            time.sleep(DT_S - elapsed)

    motor.disable()

Bus Load Management

Avoiding CAN Buffer Overflow (OS Error 105)

If you see socketcan write failed: No buffer space available (os error 105): 1. Increase control period:
# Too fast for bus
DT_S = 0.005  # 200Hz - may cause overflow

# Safer
DT_S = 0.02   # 50Hz - usually stable
DT_S = 0.03   # 33Hz - conservative
2. Increase TX queue length:
sudo ifconfig can0 txqueuelen 1000
3. Avoid multiple senders:
# Bad: Multiple programs sending to same CAN
# Program A and Program B both send commands

# Good: Single sender per CAN interface
4. Reduce unnecessary feedback requests:
# Bad: Request every iteration
for i in range(1000):
    motor.request_feedback()  # Every iteration
    motor.send_mit(...)

# Good: Request at lower rate
for i in range(1000):
    if i % 5 == 0:  # Every 5th iteration
        motor.request_feedback()
    motor.send_mit(...)

CAN Bandwidth Estimation

MotorsRateEst. BandwidthRisk
150Hz~5%Low
450Hz~20%Medium
850Hz~40%Medium
4100Hz~40%Medium
8100Hz~80%High

Timing Best Practices

Measure and Adapt

import time

DT_S = 0.02  # Target 50Hz

for i in range(100):
    t0 = time.time()

    # Your control code here
    # ...

    elapsed = time.time() - t0
    sleep_time = DT_S - elapsed

    if sleep_time > 0:
        time.sleep(sleep_time)
    else:
        print(f"WARNING: Overrun by {-sleep_time*1000:.1f}ms")

    # Log actual timing
    actual_dt = time.time() - t0
    if i % 10 == 0:
        print(f"Actual period: {actual_dt*1000:.1f}ms ({1/actual_dt:.1f}Hz)")

Use High-Resolution Timer

import time

# Prefer time.perf_counter() for timing measurements
t0 = time.perf_counter()
# ... work ...
elapsed = time.perf_counter() - t0

Safety Patterns

Watchdog Pattern

import time

LAST_COMMAND_TIME = time.time()
WATCHDOG_TIMEOUT = 0.5  # seconds

def send_command_safe(motor, pos):
    global LAST_COMMAND_TIME

    if time.time() - LAST_COMMAND_TIME > WATCHDOG_TIMEOUT:
        print("Watchdog timeout - sending safe command")
        motor.send_mit(0.0, 0.0, 10.0, 1.0, 0.0)  # Safe position

    motor.send_mit(pos, 0.0, 30.0, 1.0, 0.0)
    LAST_COMMAND_TIME = time.time()

Temperature Protection

TEMP_WARNING = 70.0
TEMP_CRITICAL = 85.0

def check_temperature(state):
    if not state:
        return True  # Can't check without state

    if state.t_mos > TEMP_CRITICAL or state.t_rotor > TEMP_CRITICAL:
        return False  # Critical - must stop

    if state.t_mos > TEMP_WARNING:
        print(f"WARNING: High temperature ({state.t_mos:.1f}°C)")

    return True  # OK to continue

Next Steps