mirror of
https://github.com/Klipper3d/klipper.git
synced 2026-05-07 01:36:22 +02:00
The coeff_frac_bits and scale_frac_bits can be automatically calculated, so there's no need for the callers to specify this information. Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
411 lines
17 KiB
Python
411 lines
17 KiB
Python
# Wrapper around mcu trigger_analog objects
|
|
#
|
|
# Copyright (C) 2025 Gareth Farrington <gareth@waves.ky>
|
|
# Copyright (C) 2026 Kevin O'Connor <kevin@koconnor.net>
|
|
#
|
|
# This file may be distributed under the terms of the GNU GPLv3 license.
|
|
import mcu
|
|
|
|
|
|
######################################################################
|
|
# SOS filters (Second Order Sectional)
|
|
######################################################################
|
|
|
|
MAX_INT32 = (2 ** 31)
|
|
MIN_INT32 = -(2 ** 31) - 1
|
|
def assert_is_int32(value, frac_bits):
|
|
if value > MAX_INT32 or value < MIN_INT32:
|
|
raise OverflowError("Fixed point Q%d.%d overflow"
|
|
% (31-frac_bits, frac_bits))
|
|
return value
|
|
|
|
# convert a floating point value to a 32 bit fixed point representation
|
|
# checks for overflow
|
|
def to_fixed_32(value, frac_bits=0):
|
|
fixed_val = int(round(value * (2**frac_bits)))
|
|
return assert_is_int32(fixed_val, frac_bits)
|
|
|
|
# Determine maximum frac bits for a list of values
|
|
def calc_frac_bits(values):
|
|
if all([v == int(v) for v in values]):
|
|
return 0
|
|
mv = max([abs(v) for v in values])
|
|
frac_bits = 31 - int(mv).bit_length() # 63 - int(mv * (1<<32)).bit_length()
|
|
if frac_bits <= 0:
|
|
return 0
|
|
try:
|
|
validate = [to_fixed_32(v) for v in values]
|
|
except OverflowError as e:
|
|
# Handle rare case where rounding causes an overflow
|
|
return frac_bits - 1
|
|
return frac_bits
|
|
|
|
# Pre-generated SOS filters (avoid Scipy package for common installs)
|
|
GeneratedSOS = {
|
|
('lowpass', 10.0, 4): [
|
|
[0.004824343357716228, 0.009648686715432456, 0.004824343357716228,
|
|
1.0, -1.0485995763626117, 0.2961403575616696],
|
|
[1.0, 2.0, 1.0, 1.0, -1.3209134308194264, 0.6327387928852766],
|
|
],
|
|
}
|
|
|
|
# Helper tool to pre-generate SOS filters. Run with something like:
|
|
# python -c 'import trigger_analog as m; m.pre_gen_filt("lowpass", 250, 25, 4)'
|
|
def pre_gen_filt(btype, sps, freq, order):
|
|
global GeneratedSOS
|
|
GeneratedSOS = {}
|
|
# Create filter
|
|
df = DigitalFilter(sps, ImportError)
|
|
fs = df._butter(freq, btype, order)
|
|
# Write filter info to stdout
|
|
msgs = []
|
|
msgs.append(" ('%s', %s, %d): [" % (btype, repr(float(sps)/freq), order))
|
|
for data in fs:
|
|
coeffs = ", ".join([repr(float(c)) for c in data])
|
|
msgs.append(" [%s]," % coeffs,)
|
|
msgs.append(" ],")
|
|
msgs.append("")
|
|
import sys
|
|
sys.stdout.write("\n".join(msgs))
|
|
|
|
# Digital filter designer and container
|
|
class DigitalFilter:
|
|
def __init__(self, sps, cfg_error):
|
|
self.filter_sections = []
|
|
self.initial_state = None
|
|
self.sample_frequency = sps
|
|
self.cfg_error = cfg_error
|
|
def get_scipy_signal(self):
|
|
try:
|
|
import scipy.signal as signal
|
|
except:
|
|
raise self.cfg_error("DigitalFilter require the SciPy module")
|
|
return signal
|
|
def add_highpass(self, highpass, highpass_order):
|
|
f = self._butter(highpass, "highpass", highpass_order)
|
|
self.filter_sections.extend(f)
|
|
def add_lowpass(self, lowpass, lowpass_order):
|
|
f = self._butter(lowpass, "lowpass", lowpass_order)
|
|
self.filter_sections.extend(f)
|
|
def add_notch(self, notch_freq, notch_quality):
|
|
signal = self.get_scipy_signal()
|
|
b, a = signal.iirnotch(notch_freq, Q=notch_quality,
|
|
fs=self.sample_frequency)
|
|
f = signal.tf2sos(b, a)[0]
|
|
self.filter_sections.append(f)
|
|
def add_derivative(self):
|
|
# Sample to sample difference (derivative) as stage in SOS filter
|
|
self.filter_sections.append((1., -1., 0., 1., 0., 0.))
|
|
def setup_initial_state(self):
|
|
if not self.filter_sections:
|
|
return
|
|
self.initial_state = signal.sosfilt_zi(self.filter_sections)
|
|
def _butter(self, frequency, btype, order):
|
|
key = (btype, float(self.sample_frequency)/frequency, int(order))
|
|
if key in GeneratedSOS:
|
|
return GeneratedSOS[key]
|
|
signal = self.get_scipy_signal()
|
|
return signal.butter(order, Wn=frequency, btype=btype,
|
|
fs=self.sample_frequency, output='sos')
|
|
def get_filter_sections(self):
|
|
return self.filter_sections
|
|
def get_initial_state(self):
|
|
if self.initial_state is None:
|
|
return [[0., 0.]] * len(self.filter_sections)
|
|
return self.initial_state
|
|
def get_size(self):
|
|
return len(self.filter_sections)
|
|
|
|
# Control an `sos_filter` object on the MCU
|
|
class MCU_SosFilter:
|
|
# max_sections should be the largest number of sections you expect
|
|
# to use at runtime.
|
|
def __init__(self, mcu, cmd_queue, max_sections):
|
|
self._mcu = mcu
|
|
self._cmd_queue = cmd_queue
|
|
self._max_sections = max_sections
|
|
# SOS filter "design"
|
|
self._design = None
|
|
self._coeff_frac_bits = 0
|
|
self._last_calc_frac_bits = None
|
|
self._start_value = 0.
|
|
# Offset and scaling
|
|
self._offset = 0
|
|
self._scale = 1.
|
|
self._auto_offset = False
|
|
# MCU commands
|
|
self._oid = self._mcu.create_oid()
|
|
self._set_section_cmd = self._set_state_cmd = None
|
|
self._set_active_cmd = self._set_offset_scale_cmd = None
|
|
self._last_sent_coeffs = [None] * self._max_sections
|
|
self._last_sent_offset_scale = None
|
|
self._mcu.add_config_cmd("config_sos_filter oid=%d max_sections=%d"
|
|
% (self._oid, self._max_sections))
|
|
self._mcu.register_config_callback(self._build_config)
|
|
|
|
def _build_config(self):
|
|
self._set_section_cmd = self._mcu.lookup_command(
|
|
"sos_filter_set_section oid=%c section_idx=%c"
|
|
" sos0=%i sos1=%i sos2=%i sos3=%i sos4=%i", cq=self._cmd_queue)
|
|
self._set_state_cmd = self._mcu.lookup_command(
|
|
"sos_filter_set_state oid=%c section_idx=%c state0=%i state1=%i",
|
|
cq=self._cmd_queue)
|
|
self._set_offset_scale_cmd = self._mcu.lookup_command(
|
|
"sos_filter_set_offset_scale oid=%c offset=%i"
|
|
" scale=%i scale_frac_bits=%c auto_offset=%c", cq=self._cmd_queue)
|
|
self._set_active_cmd = self._mcu.lookup_command(
|
|
"sos_filter_set_active oid=%c n_sections=%c coeff_frac_bits=%c",
|
|
cq=self._cmd_queue)
|
|
|
|
def get_oid(self):
|
|
return self._oid
|
|
|
|
# Determine the frac_bits to use for sos filter coefficients
|
|
def _calc_coeff_frac_bits(self, filter_sections):
|
|
coeffs = sum([list(f) for f in filter_sections], [])
|
|
if not filter_sections or coeffs == self._last_calc_frac_bits:
|
|
return
|
|
self._coeff_frac_bits = calc_frac_bits(coeffs)
|
|
self._last_calc_frac_bits = coeffs
|
|
|
|
# convert the SciPi SOS filters to fixed point format
|
|
def _convert_filter(self):
|
|
if self._design is None:
|
|
return []
|
|
filter_sections = self._design.get_filter_sections()
|
|
self._calc_coeff_frac_bits(filter_sections)
|
|
sos_fixed = []
|
|
for section in filter_sections:
|
|
nun_coeff = len(section)
|
|
if nun_coeff != 6:
|
|
raise ValueError("The number of filter coefficients is %i"
|
|
", must be 6" % (nun_coeff,))
|
|
fixed_section = []
|
|
for col, coeff in enumerate(section):
|
|
if col != 3: # omit column 3
|
|
fixed_coeff = to_fixed_32(coeff, self._coeff_frac_bits)
|
|
fixed_section.append(fixed_coeff)
|
|
elif coeff != 1.0: # double check column 3 is always 1.0
|
|
raise ValueError("Coefficient 3 is expected to be 1.0"
|
|
" but was %f" % (coeff,))
|
|
sos_fixed.append(fixed_section)
|
|
return sos_fixed
|
|
|
|
# convert the SOS filter state matrix (zi) to fixed point format
|
|
def _convert_state(self):
|
|
if self._design is None:
|
|
return []
|
|
filter_state = self._design.get_initial_state()
|
|
sos_state = []
|
|
for section in filter_state:
|
|
nun_states = len(section)
|
|
if nun_states != 2:
|
|
raise ValueError(
|
|
"The number of state elements is %i, must be 2"
|
|
% (nun_states,))
|
|
fixed_state = []
|
|
for col, value in enumerate(section):
|
|
adjval = value * self._start_value
|
|
fixed_state.append(to_fixed_32(adjval))
|
|
sos_state.append(fixed_state)
|
|
return sos_state
|
|
|
|
# Set expected state when filter first starts (avoids filter
|
|
# "ringing" if sensor data has a known static offset)
|
|
def set_start_state(self, start_value):
|
|
self._start_value = start_value
|
|
|
|
# Set conversion of a raw value 1 to a 1.0 value processed by sos filter
|
|
def set_offset_scale(self, offset=0, scale=1., auto_offset=False):
|
|
self._offset = offset
|
|
self._scale = scale
|
|
self._auto_offset = auto_offset
|
|
|
|
# Change the filter coefficients and state at runtime
|
|
def set_filter_design(self, design):
|
|
self._design = design
|
|
|
|
# Resets the filter state back to initial conditions at runtime
|
|
def reset_filter(self):
|
|
# Generate filter parameters
|
|
sos_fixed = self._convert_filter()
|
|
sos_state = self._convert_state()
|
|
num_sections = len(sos_fixed)
|
|
if num_sections > self._max_sections:
|
|
raise ValueError("Too many filter sections: %i, The max is %i"
|
|
% (num_sections, self._max_sections,))
|
|
if len(sos_state) != num_sections:
|
|
raise ValueError("The number of filter sections (%i) and state "
|
|
"sections (%i) must be equal"
|
|
% (num_sections, len(sos_state)))
|
|
# Send section coefficients (if they have changed)
|
|
for i, section in enumerate(sos_fixed):
|
|
args = (self._oid, i, section[0], section[1], section[2],
|
|
section[3], section[4])
|
|
if args == self._last_sent_coeffs[i]:
|
|
continue
|
|
self._set_section_cmd.send(args)
|
|
self._last_sent_coeffs[i] = args
|
|
# Send section initial states
|
|
for i, state in enumerate(sos_state):
|
|
self._set_state_cmd.send([self._oid, i, state[0], state[1]])
|
|
# Send offset/scale (if they have changed)
|
|
scale_frac_bits = calc_frac_bits([self._scale])
|
|
su = to_fixed_32(self._scale, scale_frac_bits)
|
|
args = (self._oid, self._offset, su, scale_frac_bits, self._auto_offset)
|
|
if args != self._last_sent_offset_scale or self._auto_offset:
|
|
self._set_offset_scale_cmd.send(args)
|
|
self._last_sent_offset_scale = args
|
|
# Activate filter
|
|
if self._max_sections:
|
|
self._set_active_cmd.send([self._oid, num_sections,
|
|
self._coeff_frac_bits])
|
|
|
|
|
|
######################################################################
|
|
# Trigger Analog
|
|
######################################################################
|
|
|
|
# MCU_trigger_analog is the interface to `trigger_analog` on the MCU
|
|
class MCU_trigger_analog:
|
|
MONITOR_MAX = 3
|
|
REASON_TRIGGER_ANALOG = mcu.MCU_trsync.REASON_COMMS_TIMEOUT + 1
|
|
def __init__(self, sensor_inst):
|
|
self._printer = sensor_inst.get_mcu().get_printer()
|
|
self._sensor = sensor_inst
|
|
self._mcu = self._sensor.get_mcu()
|
|
self._sos_filter = None
|
|
self._dispatch = mcu.TriggerDispatch(self._mcu)
|
|
self._last_trigger_time = 0.
|
|
# Raw range checking
|
|
self._raw_min = self._raw_max = 0
|
|
self._last_range_args = None
|
|
# Trigger type
|
|
self._trigger_type = "unspecified"
|
|
self._trigger_value = 0.
|
|
self._last_trigger_args = None
|
|
# Error codes from MCU
|
|
self._error_map = {}
|
|
self._sensor_specific_error = 0
|
|
# Configure MCU objects
|
|
self._oid = self._mcu.create_oid()
|
|
self._home_cmd = self._query_state_cmd = None
|
|
self._set_raw_range_cmd = self._set_trigger_cmd = None
|
|
self._mcu.register_config_callback(self._build_config)
|
|
|
|
def setup_sos_filter(self, sos_filter):
|
|
self._sos_filter = sos_filter
|
|
|
|
def _build_config(self):
|
|
self._sensor.setup_trigger_analog(self._oid)
|
|
cmd_queue = self._dispatch.get_command_queue()
|
|
if self._sos_filter is None:
|
|
self.setup_sos_filter(MCU_SosFilter(self._mcu, cmd_queue, 0))
|
|
self._mcu.add_config_cmd(
|
|
"config_trigger_analog oid=%d sos_filter_oid=%d" % (
|
|
self._oid, self._sos_filter.get_oid()))
|
|
# Lookup commands
|
|
self._query_state_cmd = self._mcu.lookup_query_command(
|
|
"trigger_analog_query_state oid=%c",
|
|
"trigger_analog_state oid=%c homing=%c homing_clock=%u",
|
|
oid=self._oid, cq=cmd_queue)
|
|
self._set_raw_range_cmd = self._mcu.lookup_command(
|
|
"trigger_analog_set_raw_range oid=%c raw_min=%i raw_max=%i",
|
|
cq=cmd_queue)
|
|
self._set_trigger_cmd = self._mcu.lookup_command(
|
|
"trigger_analog_set_trigger oid=%c trigger_analog_type=%c"
|
|
" trigger_value=%i", cq=cmd_queue)
|
|
self._home_cmd = self._mcu.lookup_command(
|
|
"trigger_analog_home oid=%c trsync_oid=%c trigger_reason=%c"
|
|
" error_reason=%c clock=%u monitor_ticks=%u monitor_max=%u",
|
|
cq=cmd_queue)
|
|
# Load errors from mcu
|
|
errors = self._mcu.get_enumerations().get("trigger_analog_error:", {})
|
|
self._error_map = {v: k for k, v in errors.items()}
|
|
self._sensor_specific_error = errors.get("SENSOR_SPECIFIC", 0)
|
|
|
|
def get_oid(self):
|
|
return self._oid
|
|
|
|
def get_mcu(self):
|
|
return self._mcu
|
|
|
|
def get_sos_filter(self):
|
|
return self._sos_filter
|
|
|
|
def get_dispatch(self):
|
|
return self._dispatch
|
|
|
|
def get_last_trigger_time(self):
|
|
return self._last_trigger_time
|
|
|
|
def set_trigger(self, trigger_type, trigger_value):
|
|
self._trigger_type = trigger_type
|
|
self._trigger_value = trigger_value
|
|
|
|
def set_raw_range(self, raw_min, raw_max):
|
|
self._raw_min = raw_min
|
|
self._raw_max = raw_max
|
|
|
|
def _reset_filter(self):
|
|
# Update raw range parameters in mcu (if they have changed)
|
|
args = [self._oid, self._raw_min, self._raw_max]
|
|
if args != self._last_range_args:
|
|
self._set_raw_range_cmd.send(args)
|
|
self._last_range_args = args
|
|
# Update trigger in mcu (if it has changed)
|
|
args = [self._oid, self._trigger_type, self._trigger_value]
|
|
if args != self._last_trigger_args:
|
|
self._set_trigger_cmd.send(args)
|
|
self._last_trigger_args = args
|
|
# Update sos filter in mcu
|
|
self._sos_filter.reset_filter()
|
|
|
|
def _clear_home(self):
|
|
self._home_cmd.send([self._oid, 0, 0, 0, 0, 0, 0, 0])
|
|
params = self._query_state_cmd.send([self._oid])
|
|
trigger_ticks = self._mcu.clock32_to_clock64(params['homing_clock'])
|
|
return self._mcu.clock_to_print_time(trigger_ticks)
|
|
|
|
def get_steppers(self):
|
|
return self._dispatch.get_steppers()
|
|
|
|
def home_start(self, print_time, sample_time, sample_count, rest_time,
|
|
triggered=True):
|
|
self._last_trigger_time = 0.
|
|
self._reset_filter()
|
|
trigger_completion = self._dispatch.start(print_time)
|
|
clock = self._mcu.print_time_to_clock(print_time)
|
|
sensor_update = 1. / self._sensor.get_samples_per_second()
|
|
sm_ticks = self._mcu.seconds_to_clock(sensor_update)
|
|
self._home_cmd.send([self._oid, self._dispatch.get_oid(),
|
|
mcu.MCU_trsync.REASON_ENDSTOP_HIT, self.REASON_TRIGGER_ANALOG,
|
|
clock, sm_ticks, self.MONITOR_MAX], reqclock=clock)
|
|
return trigger_completion
|
|
|
|
def home_wait(self, home_end_time):
|
|
self._dispatch.wait_end(home_end_time)
|
|
# trigger has happened, now to find out why...
|
|
res = self._dispatch.stop()
|
|
# clear the homing state so it stops processing samples
|
|
trigger_time = self._clear_home()
|
|
if res >= mcu.MCU_trsync.REASON_COMMS_TIMEOUT:
|
|
if res == mcu.MCU_trsync.REASON_COMMS_TIMEOUT:
|
|
raise self._printer.command_error(
|
|
"Communication timeout during homing")
|
|
error_code = res - self.REASON_TRIGGER_ANALOG
|
|
if error_code >= self._sensor_specific_error:
|
|
sensor_err = error_code - self._sensor_specific_error
|
|
error_msg = self._sensor.lookup_sensor_error(sensor_err)
|
|
else:
|
|
defmsg = "Unknown code %i" % (error_code,)
|
|
error_msg = self._error_map.get(error_code, defmsg)
|
|
raise self._printer.command_error("Trigger analog error: %s"
|
|
% (error_msg,))
|
|
if res != mcu.MCU_trsync.REASON_ENDSTOP_HIT:
|
|
return 0.
|
|
if self._mcu.is_fileoutput():
|
|
trigger_time = home_end_time
|
|
self._last_trigger_time = trigger_time
|
|
return trigger_time
|