Files
klipper/klippy/extras/probe_eddy_current.py
Kevin O'Connor 6b4aeb4c44 probe_eddy_current: Scale intermediate values within the "tap" sos filter
Scale the internal mcu values to units of milli-hz to reduce the
chance of roundoff error in the internal calculations.

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
2026-04-15 10:39:39 -04:00

943 lines
42 KiB
Python

# Support for eddy current based Z probes
#
# Copyright (C) 2021-2026 Kevin O'Connor <kevin@koconnor.net>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
import sys, logging, math, bisect
import mcu, mathutil
from . import ldc1612, trigger_analog, probe, manual_probe
OUT_OF_RANGE = 99.9
# Tool for calibrating the sensor Z detection and applying that calibration
class EddyCalibration:
def __init__(self, config):
self.printer = config.get_printer()
self.name = config.get_name()
self.drift_comp = DummyDriftCompensation()
# Current calibration data
self.cal_freqs = []
self.cal_zpos = []
cal = config.get('calibrate', None)
if cal is not None:
cal = [list(map(float, d.strip().split(':', 1)))
for d in cal.split(',')]
self.load_calibration(cal)
# Probe calibrate state
self.probe_speed = 0.
# Register commands
cname = self.name.split()[-1]
gcode = self.printer.lookup_object('gcode')
gcode.register_mux_command("PROBE_EDDY_CURRENT_CALIBRATE", "CHIP",
cname, self.cmd_EDDY_CALIBRATE,
desc=self.cmd_EDDY_CALIBRATE_help)
gcode.register_command('Z_OFFSET_APPLY_PROBE',
self.cmd_Z_OFFSET_APPLY_PROBE,
desc=self.cmd_Z_OFFSET_APPLY_PROBE_help)
def get_printer(self):
return self.printer
def verify_calibrated(self):
if len(self.cal_freqs) <= 2:
raise self.printer.command_error(
"Must calibrate probe_eddy_current first")
def load_calibration(self, cal):
cal = sorted([(c[1], c[0]) for c in cal])
self.cal_freqs = [c[0] for c in cal]
self.cal_zpos = [c[1] for c in cal]
def apply_calibration(self, samples):
cur_temp = self.drift_comp.get_temperature()
for i, (samp_time, freq, dummy_z) in enumerate(samples):
adj_freq = self.drift_comp.adjust_freq(freq, cur_temp)
pos = bisect.bisect(self.cal_freqs, adj_freq)
if pos >= len(self.cal_zpos):
zpos = -OUT_OF_RANGE
elif pos == 0:
zpos = OUT_OF_RANGE
else:
# XXX - could further optimize and avoid div by zero
this_freq = self.cal_freqs[pos]
prev_freq = self.cal_freqs[pos - 1]
this_zpos = self.cal_zpos[pos]
prev_zpos = self.cal_zpos[pos - 1]
gain = (this_zpos - prev_zpos) / (this_freq - prev_freq)
offset = prev_zpos - prev_freq * gain
zpos = adj_freq * gain + offset
samples[i] = (samp_time, freq, round(zpos, 6))
def freq_to_height(self, freq):
dummy_sample = [(0., freq, 0.)]
self.apply_calibration(dummy_sample)
return dummy_sample[0][2]
def height_to_freq(self, height):
# XXX - could optimize lookup
rev_zpos = list(reversed(self.cal_zpos))
rev_freqs = list(reversed(self.cal_freqs))
pos = bisect.bisect(rev_zpos, height)
if pos == 0 or pos >= len(rev_zpos):
raise self.printer.command_error(
"Invalid probe_eddy_current height")
this_freq = rev_freqs[pos]
prev_freq = rev_freqs[pos - 1]
this_zpos = rev_zpos[pos]
prev_zpos = rev_zpos[pos - 1]
gain = (this_freq - prev_freq) / (this_zpos - prev_zpos)
offset = prev_freq - prev_zpos * gain
freq = height * gain + offset
return self.drift_comp.unadjust_freq(freq)
def do_calibration_moves(self, move_speed):
toolhead = self.printer.lookup_object('toolhead')
kin = toolhead.get_kinematics()
move = toolhead.manual_move
# Start data collection
msgs = []
is_finished = False
def handle_batch(msg):
if is_finished:
return False
msgs.append(msg)
return True
self.printer.lookup_object(self.name).add_client(handle_batch)
toolhead.dwell(1.)
self.drift_comp.note_z_calibration_start()
# Move to each 40um position
max_z = 4.0
samp_dist = 0.040
req_zpos = [i*samp_dist for i in range(int(max_z / samp_dist) + 1)]
start_pos = toolhead.get_position()
times = []
for zpos in req_zpos:
# Move to next position (always descending to reduce backlash)
hop_pos = list(start_pos)
hop_pos[2] += zpos + 0.500
move(hop_pos, move_speed)
next_pos = list(start_pos)
next_pos[2] += zpos
move(next_pos, move_speed)
# Note sample timing
start_query_time = toolhead.get_last_move_time() + 0.050
end_query_time = start_query_time + 0.100
toolhead.dwell(0.200)
# Find Z position based on actual commanded stepper position
toolhead.flush_step_generation()
kin_spos = {s.get_name(): s.get_commanded_position()
for s in kin.get_steppers()}
kin_pos = kin.calc_position(kin_spos)
times.append((start_query_time, end_query_time, kin_pos[2]))
toolhead.dwell(1.0)
toolhead.wait_moves()
self.drift_comp.note_z_calibration_finish()
# Finish data collection
is_finished = True
# Correlate query responses
cal = {}
step = 0
for msg in msgs:
for query_time, freq, old_z in msg['data']:
# Add to step tracking
while step < len(times) and query_time > times[step][1]:
step += 1
if step < len(times) and query_time >= times[step][0]:
cal.setdefault(times[step][2], []).append(freq)
if len(cal) != len(times):
raise self.printer.command_error(
"Failed calibration - incomplete sensor data")
return cal
def _median(self, values):
values = sorted(values)
n = len(values)
if n % 2 == 0:
return (values[n//2 - 1] + values[n//2]) / 2.0
return values[n // 2]
def calc_freqs(self, meas):
positions = {}
for pos, freqs in meas.items():
count = len(freqs)
freq_avg = float(sum(freqs)) / count
mads = [abs(f - freq_avg) for f in freqs]
mad = self._median(mads)
positions[pos] = (freq_avg, mad, count)
return positions
def validate_calibration_data(self, positions):
last_freq = 40000000.
last_pos = last_mad = .0
gcode = self.printer.lookup_object("gcode")
filtered = []
mad_hz_total = .0
mad_mm_total = .0
samples_count = 0
for pos, (freq_avg, mad_hz, count) in sorted(positions.items()):
if freq_avg > last_freq:
gcode.respond_info(
"Frequency stops decreasing at step %.3f" % (pos))
break
diff_mad = math.sqrt(last_mad**2 + mad_hz**2)
# Calculate if samples have a significant difference
freq_diff = last_freq - freq_avg
last_freq = freq_avg
if freq_diff < 2.5 * diff_mad:
gcode.respond_info(
"Frequency too noisy at step %.3f -> %.3f" % (
last_pos, pos))
gcode.respond_info(
"Frequency diff: %.3f, MAD_Hz: %.3f -> MAD_Hz: %.3f" % (
freq_diff, last_mad, mad_hz
))
break
last_mad = mad_hz
delta_dist = pos - last_pos
last_pos = pos
# MAD is Median Absolute Deviation to Frequency avg ~ delta_hz_1
# Signal is delta_hz_2 / delta_dist
# SNR ~= delta_hz_1 / (delta_hz_2 / delta_mm) = d_1 * d_mm / d_2
mad_mm = mad_hz * delta_dist / freq_diff
filtered.append((pos, freq_avg, mad_hz, mad_mm))
mad_hz_total += mad_hz
mad_mm_total += mad_mm
samples_count += count
avg_mad = mad_hz_total / len(filtered)
avg_mad_mm = mad_mm_total / len(filtered)
gcode.respond_info(
"probe_eddy_current: noise %.6fmm, MAD_Hz=%.3f in %d queries\n" % (
avg_mad_mm, avg_mad, samples_count))
freq_list = [freq for _, freq, _, _ in filtered]
freq_diff = max(freq_list) - min(freq_list)
gcode.respond_info("Total frequency range: %.3f Hz\n" % (freq_diff))
points = [0.25, 0.5, 1.0, 2.0, 3.0]
for pos, _, mad_hz, mad_mm in filtered:
if len(points) and points[0] <= pos:
points.pop(0)
msg = "z: %.3f # noise %.6fmm, MAD_Hz=%.3f\n" % (
pos, mad_mm, mad_hz)
gcode.respond_info(msg)
return filtered
def post_manual_probe(self, mpresult):
if mpresult is None:
# Manual Probe was aborted
return
curpos = [mpresult.bed_x, mpresult.bed_y, mpresult.bed_z]
move = self.printer.lookup_object('toolhead').manual_move
# Move away from the bed
probe_calibrate_z = curpos[2]
curpos[2] += 5.
move(curpos, self.probe_speed)
# Move sensor over nozzle position
pprobe = self.printer.lookup_object("probe")
x_offset, y_offset, z_offset = pprobe.get_offsets()
curpos[0] -= x_offset
curpos[1] -= y_offset
move(curpos, self.probe_speed)
# Descend back to bed
curpos[2] -= 5. - 0.050
move(curpos, self.probe_speed)
# Perform calibration movement and capture
cal = self.do_calibration_moves(self.probe_speed)
# Calculate each sample position average and variance
_positions = self.calc_freqs(cal)
# Fix Z position offset
positions = {}
for k in _positions:
v = _positions[k]
k = k - probe_calibrate_z
positions[k] = v
filtered = self.validate_calibration_data(positions)
if len(filtered) <= 8:
raise self.printer.command_error(
"Failed calibration - No usable data")
z_freq_pairs = [(pos, freq) for pos, freq, _, _ in filtered]
self._save_calibration(z_freq_pairs)
def _save_calibration(self, z_freq_pairs):
gcode = self.printer.lookup_object("gcode")
gcode.respond_info(
"The SAVE_CONFIG command will update the printer config file\n"
"and restart the printer.")
# Save results
cal_contents = []
for i, (pos, freq) in enumerate(z_freq_pairs):
if not i % 3:
cal_contents.append('\n')
cal_contents.append("%.6f:%.3f" % (pos, freq))
cal_contents.append(',')
cal_contents.pop()
configfile = self.printer.lookup_object('configfile')
configfile.set(self.name, 'calibrate', ''.join(cal_contents))
def _save_tap_z_offset(self, gcmd, homing_z):
eventtime = self.printer.get_reactor().monotonic()
configfile = self.printer.lookup_object('configfile')
cstatus = configfile.get_status(eventtime)
csettings = cstatus.get('settings', {}).get(self.name, {})
tap_z_offset = csettings.get('tap_z_offset', 0.)
new_calibrate = tap_z_offset - homing_z
gcmd.respond_info(
"%s: tap_z_offset: %.3f\n"
"The SAVE_CONFIG command will update the printer config file\n"
"with the above and restart the printer."
% (self.name, new_calibrate))
configfile.set(self.name, 'tap_z_offset', "%.3f" % (new_calibrate,))
cmd_EDDY_CALIBRATE_help = "Calibrate eddy current probe"
def cmd_EDDY_CALIBRATE(self, gcmd):
self.probe_speed = gcmd.get_float("PROBE_SPEED", 5., above=0.)
# Start manual probe
manual_probe.ManualProbeHelper(self.printer, gcmd,
self.post_manual_probe)
cmd_Z_OFFSET_APPLY_PROBE_help = "Adjust the probe's z_offset"
def cmd_Z_OFFSET_APPLY_PROBE(self, gcmd):
gcode_move = self.printer.lookup_object("gcode_move")
offset = gcode_move.get_status()['homing_origin'].z
if offset == 0:
gcmd.respond_info("Nothing to do: Z Offset is 0")
return
if gcmd.get("METHOD", "").lower() == "tap":
self._save_tap_z_offset(gcmd, offset)
return
cal_zpos = [z - offset for z in self.cal_zpos]
z_freq_pairs = zip(cal_zpos, self.cal_freqs)
z_freq_pairs = sorted(z_freq_pairs)
self._save_calibration(z_freq_pairs)
def register_drift_compensation(self, comp):
self.drift_comp = comp
# Tool to gather samples and convert them to probe positions
class EddyGatherSamples:
def __init__(self, printer, sensor_helper):
self._printer = printer
self._sensor_helper = sensor_helper
# Sensor reading
self._sensor_messages = []
self._need_stop = False
# Probe request and results storage
self._probe_requests = []
self._analysis_results = []
# Start samples
sensor_helper.add_client(self._add_sensor_message)
# Sensor reading and measurement extraction
def _add_sensor_message(self, msg):
if self._need_stop:
del self._sensor_messages[:]
return False
self._sensor_messages.append(msg)
self._check_sensor_messages()
return True
def finish(self):
self._need_stop = True
def _pull_measurements(self, start_time, end_time):
# Extract measurements from sensor messages for given time range
measures = []
msg_num = discard_msgs = 0
while msg_num < len(self._sensor_messages):
msg = self._sensor_messages[msg_num]
msg_num += 1
data = msg['data']
if data[0][0] > end_time:
break
if data[-1][0] < start_time:
discard_msgs = msg_num
continue
for measure in data:
time = measure[0]
if time < start_time:
continue
if time > end_time:
break
measures.append(measure)
del self._sensor_messages[:discard_msgs]
return measures
def _check_sensor_messages(self):
while self._sensor_messages and self._probe_requests:
cb, start_time, end_time, args = self._probe_requests[0]
if self._sensor_messages[-1]['data'][-1][0] < end_time:
break
measures = self._pull_measurements(start_time, end_time)
errmsg = res = None
try:
# Call analysis callback to process measurements
res = cb(measures, *args)
except self._printer.command_error as e:
# Defer raising of errors to pull_probed()
errmsg = str(e)
self._analysis_results.append((res, errmsg))
self._probe_requests.pop(0)
def add_probe_request(self, cb, start_time, end_time, *args):
self._probe_requests.append((cb, start_time, end_time, args))
self._check_sensor_messages()
# Extract probe results
def _await_sensor_messages(self):
# Make sure enough samples have been collected
reactor = self._printer.get_reactor()
mcu = self._sensor_helper.get_mcu()
while self._probe_requests:
cb, start_time, end_time, args = self._probe_requests[0]
systime = reactor.monotonic()
est_print_time = mcu.estimated_print_time(systime)
if est_print_time > end_time + 1.0:
raise self._printer.command_error(
"probe_eddy_current sensor outage")
if mcu.is_fileoutput():
# In debugging mode - just create dummy response
dummy_pr = manual_probe.create_probe_result((0., 0., 0.,))
self._analysis_results.append((dummy_pr, None))
self._probe_requests.pop(0)
continue
reactor.pause(systime + 0.010)
def pull_probed(self):
self._await_sensor_messages()
results = []
for res, errmsg in self._analysis_results:
if errmsg is not None:
raise self._printer.command_error(errmsg)
results.append(res)
del self._analysis_results[:]
return results
# Generate a ProbeResult from the average of a set of measurements
def probe_results_from_avg(measures, toolhead_pos, calibration, offsets):
cmderr = calibration.get_printer().command_error
if not measures:
raise cmderr("Unable to obtain probe_eddy_current sensor readings")
# Determine average of measurements
freq_sum = sum([m[1] for m in measures])
freq_avg = freq_sum / len(measures)
# Determine height associated with frequency
sensor_z = calibration.freq_to_height(freq_avg)
if sensor_z <= -OUT_OF_RANGE or sensor_z >= OUT_OF_RANGE:
raise cmderr("probe_eddy_current sensor not in valid range")
return manual_probe.create_probe_result(toolhead_pos,
(offsets[0], offsets[1], sensor_z))
MAX_VALID_RAW_VALUE=0x03ffffff
# Helper for implementing PROBE style commands (descend until trigger)
class EddyDescend:
def __init__(self, config, sensor_helper, calibration,
probe_offsets, param_helper, trigger_analog):
self._printer = config.get_printer()
self._sensor_helper = sensor_helper
self._calibration = calibration
self._probe_offsets = probe_offsets
self._param_helper = param_helper
self._trigger_analog = trigger_analog
if (config.get('z_offset', None, note_valid=False) is not None
and config.get('descend_z', None, note_valid=False) is None):
config.deprecate('z_offset')
self._descend_z = config.getfloat('z_offset', above=0.)
else:
self._descend_z = config.getfloat('descend_z', above=0.)
self._z_min_position = probe.lookup_minimum_z(config)
self._gather = None
def _prep_trigger_analog(self):
sos_filter = self._trigger_analog.get_sos_filter()
sos_filter.set_filter_design(None)
sos_filter.set_offset_scale(0, 1.)
self._trigger_analog.set_raw_range(0, MAX_VALID_RAW_VALUE)
trigger_freq = self._calibration.height_to_freq(self._descend_z)
conv_freq = self._sensor_helper.convert_frequency_to_raw(trigger_freq)
self._trigger_analog.set_trigger('gt', conv_freq)
# Probe session interface
def start_probe_session(self, gcmd):
self._calibration.verify_calibrated()
self._prep_trigger_analog()
self._gather = EddyGatherSamples(self._printer, self._sensor_helper)
return self
def run_probe(self, gcmd):
toolhead = self._printer.lookup_object('toolhead')
pos = toolhead.get_position()
pos[2] = self._z_min_position
speed = self._param_helper.get_probe_params(gcmd)['probe_speed']
# Perform probing move
phoming = self._printer.lookup_object('homing')
trig_pos = phoming.probing_move(self._trigger_analog, pos, speed)
# Extract samples
start_time = self._trigger_analog.get_last_trigger_time() + 0.050
end_time = start_time + 0.100
toolhead_pos = toolhead.get_position()
offsets = self._probe_offsets.get_offsets()
self._gather.add_probe_request(probe_results_from_avg,
start_time, end_time,
toolhead_pos, self._calibration, offsets)
def pull_probed_results(self):
return self._gather.pull_probed()
def end_probe_session(self):
self._gather.finish()
self._gather = None
# Wrapper to emulate mcu_endstop for probe:z_virtual_endstop
# Note that this does not provide accurate results
class EddyEndstopWrapper:
def __init__(self, sensor_helper, eddy_descend):
self._sensor_helper = sensor_helper
self._eddy_descend = eddy_descend
self._hw_probe_session = None
# Interface for MCU_endstop
def get_mcu(self):
return self._sensor_helper.get_mcu()
def add_stepper(self, stepper):
pass
def get_steppers(self):
return self._eddy_descend._trigger_analog.get_steppers()
def home_start(self, print_time, sample_time, sample_count, rest_time,
triggered=True):
return self._eddy_descend._trigger_analog.home_start(
print_time, sample_time, sample_count, rest_time, triggered)
def home_wait(self, home_end_time):
return self._eddy_descend._trigger_analog.home_wait(home_end_time)
def query_endstop(self, print_time):
return False # XXX
# Interface for HomingViaProbeHelper
def multi_probe_begin(self):
self._hw_probe_session = self._eddy_descend.start_probe_session(None)
def multi_probe_end(self):
self._hw_probe_session.end_probe_session()
self._hw_probe_session = None
def probe_prepare(self, hmove):
pass
def probe_finish(self, hmove):
pass
def get_position_endstop(self):
return self._eddy_descend._descend_z
# Probing helper for "tap" requests
class EddyTap:
def __init__(self, config, sensor_helper, param_helper, trigger_analog):
self._printer = config.get_printer()
self._sensor_helper = sensor_helper
self._param_helper = param_helper
self._trigger_analog = trigger_analog
self._z_min_position = probe.lookup_minimum_z(config)
self._gather = None
self._filter_design = None
self._tap_z_offset = config.getfloat('tap_z_offset', 0.)
self._tap_threshold = config.getfloat('tap_threshold', 0., above=0.)
self._least_squares_cache = {}
self._current_tap_threshold = 0.
if self._tap_threshold:
self._setup_tap()
# Setup for "tap" probe request
def _setup_tap(self):
# Create sos filter "design"
cfg_error = self._printer.config_error
sps = self._sensor_helper.get_samples_per_second()
design = trigger_analog.DigitalFilter(sps, cfg_error)
design.add_lowpass(25.0, 4)
design.add_derivative()
self._filter_design = design
# Create SOS filter
cmd_queue = self._trigger_analog.get_dispatch().get_command_queue()
mcu = self._sensor_helper.get_mcu()
filter_size = design.get_size()
sos_filter = trigger_analog.MCU_SosFilter(mcu, cmd_queue, filter_size)
self._trigger_analog.setup_sos_filter(sos_filter)
def _prep_trigger_analog_tap(self, gcmd):
if not self._tap_threshold:
raise self._printer.command_error("Tap not configured")
# Setup mcu filter (scale internal values to milli-hz)
sos_filter = self._trigger_analog.get_sos_filter()
sos_filter.set_filter_design(self._filter_design)
FRAC_HZ = 1000.
s = FRAC_HZ * self._sensor_helper.convert_raw_to_frequency(1)
sos_filter.set_offset_scale(0, s, auto_offset=True)
self._trigger_analog.set_raw_range(0, MAX_VALID_RAW_VALUE)
# Set mcu trigger to tap_threshold
tap_threshold = gcmd.get_float("TAP_THRESHOLD",
self._tap_threshold, above=0.)
samp_thresh = int(FRAC_HZ * tap_threshold + 0.5)
self._trigger_analog.set_trigger('diff_peak_gt', samp_thresh)
self._current_tap_threshold = tap_threshold
# Measurement analysis to determine "tap" position
def _validate_samples_time(self, measures, start_time, end_time):
cmderr = self._printer.command_error
if end_time - start_time < 0.100:
raise cmderr("Tap detected too close to start of move")
timestamps = [m[0] for m in measures]
if len(timestamps) < 2:
raise cmderr("Unable to obtain probe_eddy_current sensor readings")
ts = [start_time] + timestamps + [end_time]
tdiffs = [ts[i] - ts[i-1] for i in range(1, len(ts))]
tmax = max(tdiffs)
tmin = min(tdiffs[1:-1])
cycle_time = 1.0 / self._sensor_helper.get_samples_per_second()
if tmax > cycle_time * 1.25:
raise cmderr("Eddy: Gaps in the data: %.3f > %.3f"
% (tmax, cycle_time * 1.25))
if tmin < cycle_time * 0.75:
raise cmderr("Eddy: CLKIN frequency too low: %.3f < %.3f"
% (tmin, cycle_time * 0.75))
def _lookup_toolhead_pos(self, pos_time):
toolhead = self._printer.lookup_object('toolhead')
kin = toolhead.get_kinematics()
kin_spos = {s.get_name(): s.mcu_to_commanded_position(
s.get_past_mcu_position(pos_time))
for s in kin.get_steppers()}
return kin.calc_position(kin_spos)
def _build_ls_matrix(self, samples, est_z_contact):
# The function here is only a reference for the optimized version below
len_samples = len(samples)
eqs = [[0.] * 4 for i in range(len_samples)]
ans = [[0.] for i in range(len_samples)]
for i, (step_z, sensor_freq) in enumerate(samples):
ans[i][0] = sensor_freq
eq = eqs[i]
eq[0] = 1.
if step_z <= est_z_contact:
# 1*c0 + (z-ezc)*c1 + ezc*c2 + ezc*ezc*c3 = freq
eq[1] = step_z - est_z_contact
eq[2] = est_z_contact
eq[3] = est_z_contact * est_z_contact
else:
# 1*c0 + 0*c1 + z*c2 + z*z*c3 = freq
eq[1] = 0.
eq[2] = step_z
eq[3] = step_z * step_z
eqst = mathutil.mat_transp(eqs)
eqst_eqs = mathutil.mat_mat_mul(eqst, eqs)
eqst_ans = mathutil.mat_mat_mul(eqst, ans)
return eqst_eqs, eqst_ans
def _build_sums(self, samples, num_le):
sum_le_z = sum_le_z2 = sum_le_freq = sum_le_freq_z = 0.
for z, freq in samples[:num_le]:
sum_le_z += z
sum_le_z2 += z**2
sum_le_freq += freq
sum_le_freq_z += freq*z
sum_gt_z = sum_gt_z2 = sum_gt_z3 = sum_gt_z4 = 0.
sum_gt_freq = sum_gt_freq_z = sum_gt_freq_z2 = 0.
for z, freq in samples[num_le:]:
sum_gt_z += z
sum_gt_z2 += z**2
sum_gt_z3 += z**3
sum_gt_z4 += z**4
sum_gt_freq += freq
sum_gt_freq_z += freq*z
sum_gt_freq_z2 += freq * z**2
return (sum_le_z, sum_le_z2, sum_le_freq, sum_le_freq_z,
sum_gt_z, sum_gt_z2, sum_gt_z3, sum_gt_z4,
sum_gt_freq, sum_gt_freq_z, sum_gt_freq_z2)
def _build_ls_matrix_opt(self, samples, est_z_contact):
# This function is an optimized version of _build_ls_matrix()
num_le = bisect.bisect(samples, (est_z_contact, sys.float_info.max))
# Check for previously calculated raw freq/z counters
sums = self._least_squares_cache.get(num_le)
if sums is None:
sums = self._build_sums(samples, num_le)
self._least_squares_cache[num_le] = sums
(sum_le_z, sum_le_z2, sum_le_freq, sum_le_freq_z,
sum_gt_z, sum_gt_z2, sum_gt_z3, sum_gt_z4,
sum_gt_freq, sum_gt_freq_z, sum_gt_freq_z2) = sums
num_samples = len(samples)
ezc = est_z_contact
ezc2 = ezc**2
ezc3 = ezc**3
ezc4 = ezc**4
# Build matrices for least squares evaluation
eqst_eqs = [[0.] * 4 for i in range(4)]
eqst_eqs[0][0] = num_samples
eqst_eqs[1][1] = sum_le_z2 - 2*ezc*sum_le_z + num_le*ezc2
eqst_eqs[2][2] = sum_gt_z2 + num_le*ezc2
eqst_eqs[3][3] = sum_gt_z4 + num_le*ezc4
eqst_eqs[0][1] = eqst_eqs[1][0] = sum_le_z - num_le*ezc
eqst_eqs[0][2] = eqst_eqs[2][0] = sum_gt_z + num_le*ezc
eqst_eqs[0][3] = eqst_eqs[3][0] = sum_gt_z2 + num_le*ezc2
eqst_eqs[2][3] = eqst_eqs[3][2] = sum_gt_z3 + num_le*ezc3
eqst_eqs[2][1] = eqst_eqs[1][2] = ezc * eqst_eqs[0][1]
eqst_eqs[3][1] = eqst_eqs[1][3] = ezc2 * eqst_eqs[0][1]
eqst_ans = [[0.] for i in range(4)]
eqst_ans[0][0] = sum_le_freq + sum_gt_freq
eqst_ans[1][0] = sum_le_freq_z - ezc*sum_le_freq
eqst_ans[2][0] = sum_gt_freq_z + ezc*sum_le_freq
eqst_ans[3][0] = sum_gt_freq_z2 + ezc2 * sum_le_freq
return eqst_eqs, eqst_ans
def _calc_least_squares(self, samples, est_z_contact):
eqst_eqs, eqst_ans = self._build_ls_matrix_opt(samples, est_z_contact)
coeffs = mathutil.gaussian_solve(eqst_eqs, eqst_ans)
if coeffs is not None and coeffs[3][0] < 0.:
# z**2 factor can't be negative - retry using only linear
alt_eqst_eqs = [ee[:3] for ee in eqst_eqs[:3]]
alt_eqst_ans = eqst_ans[:3]
coeffs = mathutil.gaussian_solve(alt_eqst_eqs, alt_eqst_ans)
if coeffs is not None:
coeffs = coeffs + [[0.]]
if coeffs is None:
return sys.float_info.max, [[0.]] * 4
rel_err = -sum([c[0]*a[0] for c, a in zip(coeffs, eqst_ans)])
return rel_err, coeffs
def _find_least_squares(self, data):
#for d in data:
# logging.info("sample: freq=%.3f z=%.6f", d[0], d[1][2])
self._least_squares_cache.clear()
# Change base of freq/z measurements to improve numerical stability
base_z = .5 * (data[0][1][2] + data[-1][1][2])
base_freq = .5 * (data[0][0] + data[-1][0])
samples = [(d[1][2] - base_z, d[0] - base_freq) for d in data]
# Run least squares with various z values to reduce residual error
min_z = best_z = samples[0][0]
max_z = samples[-1][0]
best_err = sys.float_info.max
best_coeffs = [0., 0., 0., 0.]
while max_z - min_z > 0.000050:
# Select z value to check
mid_z = (min_z + max_z) * .5
if best_z < mid_z:
guess_z = (best_z + max_z) * .5
else:
guess_z = (min_z + best_z) * .5
# Calculate least squares error for given z
guess_err, coeffs = self._calc_least_squares(samples, guess_z)
# Update search bounds
if guess_err < best_err:
if guess_z > best_z:
min_z = best_z
else:
max_z = best_z
best_z = guess_z
best_err = guess_err
best_coeffs = coeffs
else:
if guess_z > best_z:
max_z = guess_z
else:
min_z = guess_z
self._least_squares_cache.clear()
# Return to original freq/z measurement base
bc = [v[0] for v in best_coeffs]
final_coeffs = (base_z + best_z,
base_freq + bc[0] + best_z*bc[2] + best_z*best_z*bc[3],
bc[1], bc[2] + 2.*best_z*bc[3], bc[3])
#logging.info("probe_analysis: coeffs=%s", final_coeffs)
return final_coeffs
def _error_detect(self, msg):
raise self._printer.command_error("Unable to detect tap: %s" % (msg,))
def _analyze_pullback(self, measures, start_time, end_time, speed):
reactor = self._printer.get_reactor()
self._validate_samples_time(measures, start_time, end_time)
# Correlate measurements to toolhead position at time of measurement
data = [(sensor_freq, self._lookup_toolhead_pos(samp_time))
for samp_time, sensor_freq, sensor_z in measures]
reactor.pause(0.)
min_z = data[0][1][2]
max_z = data[-1][1][2]
if max_z - min_z < 0.350:
self._error_detect("insufficient lift (%.6f vs %.6f)"
% (max_z - min_z, 0.350))
# Find best fit for extracted measurements
coeffs = self._find_least_squares(data)
z_contact, freq_contact, depress_slope, slope, slope2 = coeffs
reactor.pause(0.)
sps = self._sensor_helper.get_samples_per_second()
contact_slope_delta_per_sample = (depress_slope - slope) * speed / sps
if contact_slope_delta_per_sample < self._current_tap_threshold:
self._error_detect("insufficient slope delta (%.6f vs %.6f)"
% (contact_slope_delta_per_sample,
self._current_tap_threshold))
if slope >= 0. or slope2 < 0.:
self._error_detect("invalid free air slope (s=%.6f s2=%.6f)"
% (slope, slope2))
if z_contact - min_z < 0.030 or z_contact - min_z > 0.250:
self._error_detect("invalid depress distance (%.6f vs %.6f:%.6f)"
% (z_contact - min_z, 0.030, 0.250))
# Report probe position
trig_idx = len(data)-1
while trig_idx > 0 and data[trig_idx-1][1][2] > z_contact:
trig_idx -= 1
trig_pos = data[trig_idx][1]
adj_z_contact = z_contact - self._tap_z_offset
return manual_probe.ProbeResult(trig_pos[0], trig_pos[1], adj_z_contact,
trig_pos[0], trig_pos[1], trig_pos[2])
# Probe session interface
def start_probe_session(self, gcmd):
self._prep_trigger_analog_tap(gcmd)
self._gather = EddyGatherSamples(self._printer, self._sensor_helper)
return self
def run_probe(self, gcmd):
toolhead = self._printer.lookup_object('toolhead')
pos = toolhead.get_position()
pos[2] = self._z_min_position
params = self._param_helper.get_probe_params(gcmd)
speed = params['probe_speed']
lift_speed = params['lift_speed']
lift_dist = gcmd.get_float('SAMPLE_RETRACT_DIST', 4., above=0.)
# Perform probing move
phoming = self._printer.lookup_object('homing')
trig_pos = phoming.probing_move(self._trigger_analog, pos, speed)
# Perform lifting move
haltpos = toolhead.get_position()
haltpos[2] += lift_dist
retract_start_time = toolhead.get_last_move_time()
toolhead.manual_move(haltpos, lift_speed)
# Extract retract samples
start_time = retract_start_time - 0.010
end_time = retract_start_time + 0.150
self._gather.add_probe_request(self._analyze_pullback, start_time,
end_time, start_time, end_time, speed)
def pull_probed_results(self):
return self._gather.pull_probed()
def end_probe_session(self):
self._gather.finish()
self._gather = None
# Implementing probing with "METHOD=scan"
class EddyScanningProbe:
def __init__(self, config, sensor_helper, calibration, probe_offsets):
self._printer = config.get_printer()
self._sensor_helper = sensor_helper
self._calibration = calibration
self._offsets = probe_offsets.get_offsets()
self._gather = None
self._sample_time_delay = 0.050
self._sample_time = 0.
self._is_rapid = False
self._printer.register_event_handler("gcode:command_error",
self._handle_command_error)
def _handle_command_error(self):
if self._gather is not None:
self.end_probe_session()
def _lookup_toolhead_pos(self, pos_time):
toolhead = self._printer.lookup_object('toolhead')
kin = toolhead.get_kinematics()
kin_spos = {s.get_name(): s.mcu_to_commanded_position(
s.get_past_mcu_position(pos_time))
for s in kin.get_steppers()}
return kin.calc_position(kin_spos)
def _analyze_scan(self, measures, pos_time):
toolhead_pos = self._lookup_toolhead_pos(pos_time)
return probe_results_from_avg(measures, toolhead_pos,
self._calibration, self._offsets)
def _rapid_lookahead_cb(self, printtime):
start_time = printtime - self._sample_time / 2
end_time = start_time + self._sample_time
self._gather.add_probe_request(self._analyze_scan, start_time, end_time,
printtime)
# Probe session interface
def start_probe_session(self, gcmd):
self._calibration.verify_calibrated()
self._gather = EddyGatherSamples(self._printer, self._sensor_helper)
self._sample_time = gcmd.get_float("SAMPLE_TIME", 0.100, above=0.0)
self._is_rapid = gcmd.get("METHOD", "scan").lower() == 'rapid_scan'
return self
def run_probe(self, gcmd):
toolhead = self._printer.lookup_object("toolhead")
if self._is_rapid:
toolhead.register_lookahead_callback(self._rapid_lookahead_cb)
return
printtime = toolhead.get_last_move_time()
toolhead.dwell(self._sample_time_delay + self._sample_time)
start_time = printtime + self._sample_time_delay
end_time = start_time + self._sample_time
self._gather.add_probe_request(self._analyze_scan, start_time, end_time,
start_time)
def pull_probed_results(self):
if self._is_rapid:
# Flush lookahead (so all lookahead callbacks are invoked)
toolhead = self._printer.lookup_object("toolhead")
toolhead.get_last_move_time()
results = self._gather.pull_probed()
# Allow axis_twist_compensation to update results
self._printer.send_event("probe:update_results", results)
return results
def end_probe_session(self):
self._gather.finish()
self._gather = None
# Eddy specific ProbeOffsets class (does not store z_offset)
class EddyProbeOffsets:
def __init__(self, config):
self.x_offset = config.getfloat('x_offset', 0.)
self.y_offset = config.getfloat('y_offset', 0.)
def get_offsets(self, gcmd=None):
return self.x_offset, self.y_offset, 0.
# Wrapper around ProbeParameterHelper
class EddyParameterHelper:
def __init__(self, config):
self._param_helper = probe.ProbeParameterHelper(config)
def get_probe_params(self, gcmd=None):
method = None
if gcmd is not None:
method = gcmd.get('METHOD', '').lower()
if method not in ['scan', 'rapid_scan', 'tap']:
return self._param_helper.get_probe_params(gcmd)
probe_speed = gcmd.get_float("PROBE_SPEED", 5.0, above=0.)
lift_speed = gcmd.get_float("LIFT_SPEED", 5.0, above=0.)
samples = gcmd.get_int("SAMPLES", 1, minval=1)
samp_retract_dist = 0.
samp_tolerance = gcmd.get_float("SAMPLES_TOLERANCE", 0.100, minval=0.)
samp_retries = gcmd.get_int("SAMPLES_TOLERANCE_RETRIES", 0, minval=0)
samples_result = gcmd.get("SAMPLES_RESULT", 'average')
return {'probe_speed': probe_speed,
'lift_speed': lift_speed,
'samples': samples,
'sample_retract_dist': samp_retract_dist,
'samples_tolerance': samp_tolerance,
'samples_tolerance_retries': samp_retries,
'samples_result': samples_result}
# Main "printer object"
class PrinterEddyProbe:
def __init__(self, config):
self.printer = config.get_printer()
self.calibration = EddyCalibration(config)
# Sensor type
sensors = { "ldc1612": ldc1612.LDC1612 }
sensor_type = config.getchoice('sensor_type', {s: s for s in sensors})
self.sensor_helper = sensors[sensor_type](config, self.calibration)
# Create trigger_analog interface
trig_analog = trigger_analog.MCU_trigger_analog(self.sensor_helper)
probe.LookupZSteppers(config, trig_analog.get_dispatch().add_stepper)
# Basic probe requests
self.probe_offsets = EddyProbeOffsets(config)
self.param_helper = EddyParameterHelper(config)
self.eddy_descend = EddyDescend(
config, self.sensor_helper, self.calibration, self.probe_offsets,
self.param_helper, trig_analog)
# Create wrapper to support Z homing with probe
mcu_probe = EddyEndstopWrapper(self.sensor_helper, self.eddy_descend)
probe.HomingViaProbeHelper(config, mcu_probe,
self.probe_offsets, self.param_helper)
# Probing via "tap" interface
self.eddy_tap = EddyTap(config, self.sensor_helper,
self.param_helper, trig_analog)
# Probing via "scan" and "rapid_scan" requests
self.eddy_scan = EddyScanningProbe(config, self.sensor_helper,
self.calibration, self.probe_offsets)
# Register with main probe interface
self.cmd_helper = probe.ProbeCommandHelper(config, self,
can_set_z_offset=False)
self.probe_session = probe.ProbeSessionHelper(
config, self.param_helper, self._start_descend_wrapper)
self.printer.add_object('probe', self)
def add_client(self, cb):
self.sensor_helper.add_client(cb)
def get_probe_params(self, gcmd=None):
return self.param_helper.get_probe_params(gcmd)
def get_offsets(self, gcmd=None):
if gcmd is not None and gcmd.get('METHOD', '').lower() == "tap":
return (0., 0., 0.)
return self.probe_offsets.get_offsets(gcmd)
def get_status(self, eventtime):
return self.cmd_helper.get_status(eventtime)
def _start_descend_wrapper(self, gcmd):
method = gcmd.get('METHOD', 'automatic').lower()
if method == "tap":
return self.eddy_tap.start_probe_session(gcmd)
return self.eddy_descend.start_probe_session(gcmd)
def start_probe_session(self, gcmd):
method = gcmd.get('METHOD', 'automatic').lower()
if method in ('scan', 'rapid_scan'):
return self.eddy_scan.start_probe_session(gcmd)
# For "tap" and normal, probe_session can average multiple attempts
return self.probe_session.start_probe_session(gcmd)
def register_drift_compensation(self, comp):
self.calibration.register_drift_compensation(comp)
class DummyDriftCompensation:
def get_temperature(self):
return 0.
def note_z_calibration_start(self):
pass
def note_z_calibration_finish(self):
pass
def adjust_freq(self, freq, temp=None):
return freq
def unadjust_freq(self, freq, temp=None):
return freq
def load_config_prefix(config):
return PrinterEddyProbe(config)