From dd2a5bf6370b16a12162eef197ce87b70e9a6a4b Mon Sep 17 00:00:00 2001 From: Robert Schauklies Date: Sat, 12 Jul 2025 20:12:18 +0200 Subject: [PATCH] adds code from: https://github.com/JeroenDelcour/spincoater without mods --- src/boot.py | 0 src/config.json | 10 ++ src/dshot.py | 113 ++++++++++++++++ src/main.py | 299 ++++++++++++++++++++++++++++++++++++++++++ src/pid.py | 256 ++++++++++++++++++++++++++++++++++++ src/rotary.py | 170 ++++++++++++++++++++++++ src/rotary_irq_esp.py | 76 +++++++++++ src/ssd1306.py | 163 +++++++++++++++++++++++ 8 files changed, 1087 insertions(+) create mode 100644 src/boot.py create mode 100644 src/config.json create mode 100644 src/dshot.py create mode 100644 src/main.py create mode 100644 src/pid.py create mode 100644 src/rotary.py create mode 100644 src/rotary_irq_esp.py create mode 100644 src/ssd1306.py diff --git a/src/boot.py b/src/boot.py new file mode 100644 index 0000000..e69de29 diff --git a/src/config.json b/src/config.json new file mode 100644 index 0000000..d5832df --- /dev/null +++ b/src/config.json @@ -0,0 +1,10 @@ +{ + "PID": { + "Kp": 1e-5, + "Ki": 1e-4, + "Kd": 0 + }, + "deposit_rpm": 1000, + "coating_rpm": 2000, + "coating_time": 10 +} diff --git a/src/dshot.py b/src/dshot.py new file mode 100644 index 0000000..3030850 --- /dev/null +++ b/src/dshot.py @@ -0,0 +1,113 @@ +import time + +from esp32 import RMT + + +class Dshot: + def __init__(self, pin): + # https://brushlesswhoop.com/dshot-and-bidirectional-dshot/ + # clock freq = 80 # Mhz + # clock divider = 7 + # pulse time = 1 / (80 / 7) = 0.0875 us + # bit duration = 3.33 us + # T1H = 2.50 us + # T0H = 1.25 us + self._ON_PULSES = 57 # number of pulses for an ON bit + self._OFF_PULSES = 29 # number of pulses for an OFF bit + self._BIT_PULSES = 76 # total number of pulses per bit + self._PAUSE_PULSES = 21 # pause after sending packet + + self.rmt = RMT(0, pin=pin, clock_div=7) + + self._arm() + self._enable_telemetry() + + def _arm(self, duration=2): + """ + Send arming sequence. + + Args: + duration: How long to send the arming sequence, in seconds. + """ + t0 = time.time() + while time.time() - t0 < duration: + self.set_throttle(0) + # time.sleep(0.01) + + def _enable_telemetry(self): + for i in range(6): + value = self._create_packet(33) + self._send(value) + for i in range(6): + value = self._create_packet(34) + self._send(value) + for i in range(6): + value = self._create_packet(35) + self._send(value) + for i in range(6): + value = self._create_packet(12) + self._send(value) + time.sleep(0.040) + + # for i in range(10): + # self._send(self._create_packet(6)) + # time.sleep(0.012) + + # self._send(self._create_packet(42)) + # self._send(self._create_packet(43)) + # self._send(self._create_packet(44)) + # self._send(self._create_packet(45)) + # self._send(self._create_packet(46)) + # self._send(self._create_packet(47)) + + # beep + # time.sleep(0.050) + # for i in range(5): + # self._send(self._create_packet(i + 1)) + # time.sleep(0.250) + + def _encode_throttle(self, value): + # 11 bit throttle: 2048 possible values. + # 0 is reserved for disarmed. 1-47 are reserved for special commands. + # Leaving 48 to 2047 (2000 steps) for the actual throttle value + value = min(max(value, 0.0), 1.0) # clamp to between 0 and 1 + if value == 0: + value = 0 # disarmed + else: + value = int(value * 1999 + 48) + return value + + def set_throttle(self, value: float, telemetry=True): + """ + Set throttle to a value between 0 and 1. + """ + value = self._encode_throttle(value) + value = self._create_packet(value) + self._send(value) + + def _create_packet(self, value, telemetry=True): + # add telemetry bit + value = (value << 1) | telemetry + + # add CRC (Cyclic Redundancy Check) + crc = (value ^ (value >> 4) ^ (value >> 8)) & 0x0F + value = (value << 4) | crc + + return value + + def _send(self, value): + """ + Send value to ESC. + """ + duration = [] + + for i in reversed(range(16)): + bit = (value & (2**i)) == 2**i # select bit + if bit == 1: + duration += [self._ON_PULSES, self._BIT_PULSES - self._ON_PULSES] + else: + duration += [self._OFF_PULSES, self._BIT_PULSES - self._OFF_PULSES] + + duration[-1] += self._PAUSE_PULSES + + self.rmt.write_pulses(duration, True) diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..b6709fa --- /dev/null +++ b/src/main.py @@ -0,0 +1,299 @@ +from machine import Pin, I2C, Timer, UART +import uasyncio +import json + +import ssd1306 +from rotary_irq_esp import RotaryIRQ +from dshot import Dshot +from pid import PID + + +def splash(): + # display.fill(0) + # display.fill_rect(0, 0, 32, 32, 1) + # display.fill_rect(2, 2, 28, 28, 0) + # display.vline(9, 8, 22, 1) + # display.vline(16, 2, 22, 1) + # display.vline(23, 8, 22, 1) + # display.fill_rect(26, 24, 2, 4, 1) + # display.text("MicroPython", 40, 0, 1) + # display.text("SSD1306", 40, 12, 1) + # display.text("OLED 128x64", 40, 24, 1) + + display.fill(0) + display.text("SPIN", 34, 4, 1) + display.text("COATER", 50, 14, 1) + + display.show() + + +def start_view(state, rotary): + display.text("SPIN", 34, 4, 1) + display.text("COATER", 50, 14, 1) + + before = "> " if rotary.value() == 0 else " " + display.text(before + "Edit", 12, 36, 1) + before = "> " if rotary.value() == 1 else " " + display.text(before + "Start", 12, 46, 1) + + +def draw_edit_menu(state, rotary): + display.text("Deposit speed:", 0, 0, 1) + display.text("{: >{w}} RPM".format(config["deposit_rpm"], w=5), 56, 10, 1) + display.text("Coating speed:", 0, 21, 1) + display.text("{: >{w}} RPM".format(config["coating_rpm"], w=5), 56, 31, 1) + display.text("Coating time:", 0, 42, 1) + display.text("{: >{w}} sec".format(config["coating_time"], w=5), 56, 52, 1) + + +def edit_deposit_view(state, rotary): + config["deposit_rpm"] = rotary.value() * 100 + draw_edit_menu(state, rotary) + display.text(">", 40, 10, 1) + + +def edit_coating_rpm_view(state, rotary): + config["coating_rpm"] = rotary.value() * 100 + draw_edit_menu(state, rotary) + display.text(">", 40, 32, 1) + + +def edit_coating_time_view(state, rotary): + config["coating_time"] = rotary.value() + draw_edit_menu(state, rotary) + display.text(">", 40, 54, 1) + + +def draw_rpm(rpm): + display.text("RPM:{: >{w}.0f}".format(rpm, w=5), 30, 27, 1) + + +def deposit_view(state, rotary): + display.fill_rect(0, 0, 127, 14, 1) + display.text("Deposit", 36, 3, 0) + draw_rpm(state["rpm"]) + display.text("Press to", 32, 42, 1) + display.text("continue", 32, 52, 1) + + +def coating_view(state, rotary): + display.fill_rect(0, 0, 127, 14, 1) + display.text("Coating", 36, 3, 0) + draw_rpm(state["rpm"]) + display.text("{: >{w}} sec".format(state["timer"], w=4), 30, 48, 1) + + +def decode_ESC_telemetry(data, motor_poles=14): + if len(data) > 10: + # use latest telemetry + data = data[-10:] + + temperature = int(data[0]) # degrees Celsius + voltage = int((data[1] << 8) | data[2]) * 0.01 # Volt + current = ( + int((data[3] << 8) | data[4]) * 0.01 + ) # Amps, only available if the ESC has a current meter + consumption = int( + (data[5] << 8) | data[6] + ) # mAh, only available if the ESC has a current meter + erpm = int((data[7] << 8) | data[8]) * 100 + rpm = erpm / (motor_poles / 2) + crc = data[9] + + # print(" Temp (C):", temperature) + # print(" Voltage (V):", voltage) + # print(" Current (A):", current) + # print("Consumption (mAh):", consumption) + # print(" Erpm:", erpm) + # print(" RPM:", rpm) + # print(" CRC:", crc) + # print() + + return temperature, voltage, current, consumption, erpm, rpm + + +async def update_display(): + global state + global rotary + while True: + display.fill(0) + state["view"](state, rotary) + display.show() + await uasyncio.sleep_ms(33) + + +async def update_motor(): + global state + dshot = Dshot(pin=Pin(18)) + rpm_pid = PID( + Kp=config["PID"]["Kp"], + Ki=config["PID"]["Ki"], + Kd=config["PID"]["Kd"], + setpoint=0, + sample_time=None, + output_limits=(0.0, 1.0), + # proportional_on_measurement=True, + ) + while True: + rpm_pid.setpoint = state["target_rpm"] + + # read ESC telemetry + if uart.any() >= 10: + telemetry = decode_ESC_telemetry(uart.read()) + if telemetry is not None: + state["rpm"] = telemetry[5] + throttle = rpm_pid(state["rpm"]) + # print( + # "Throttle:", + # throttle, + # "pid components:", + # rpm_pid.components, + # "RPM:", + # state["rpm"], + # ) + + if state["target_rpm"] == 0 and state["rpm"] < 1000: + throttle = 0 + rpm_pid.reset() + dshot.set_throttle(throttle) + + await uasyncio.sleep_ms(1) + + +def debounce_button(p): + p.irq(trigger=Pin.IRQ_FALLING, handler=None) # remove irq + timer0 = Timer(0) + timer0.init(period=20, mode=Timer.ONE_SHOT, callback=lambda t: on_button_press(p)) + + +def on_button_press(p): + p.irq(trigger=Pin.IRQ_FALLING, handler=debounce_button) # restore irq + if p.value() == 1: # debounce + return + global state + global config + global rotary + if state["view"] == start_view: + if rotary.value() == 0: + state["view"] = edit_deposit_view + rotary.set( + min_val=0, + max_val=1000, + range_mode=RotaryIRQ.RANGE_BOUNDED, + value=int(0.01 * config["deposit_rpm"]), + ) + return + if rotary.value() == 1: + state["view"] = deposit_view + state["target_rpm"] = config["deposit_rpm"] + return + if state["view"] == edit_deposit_view: + state["view"] = edit_coating_rpm_view + rotary.set( + min_val=0, + max_val=1000, + range_mode=RotaryIRQ.RANGE_BOUNDED, + value=int(0.01 * config["coating_rpm"]), + ) + return + if state["view"] == edit_coating_rpm_view: + state["view"] = edit_coating_time_view + rotary.set( + min_val=0, + max_val=9999, + range_mode=RotaryIRQ.RANGE_BOUNDED, + value=config["coating_time"], + ) + return + if state["view"] == edit_coating_time_view: + save_config() + rotary.set(min_val=0, max_val=1, range_mode=RotaryIRQ.RANGE_BOUNDED, value=0) + state["view"] = start_view + return + if state["view"] == deposit_view: + state["view"] = coating_view + start_coating(state) + return + if state["view"] == coating_view: + stop_coating() + return + + +def start_coating(state): + global timer1 + global timer2 + + state["timer"] = config["coating_time"] + + timer1.init( + period=config["coating_time"] * 1000, + mode=Timer.ONE_SHOT, + callback=lambda t: stop_coating(), + ) + + def decrement_timer(t): + state["timer"] -= 1 + + timer2.init(period=1000, mode=Timer.PERIODIC, callback=decrement_timer) + + # state["throttle"] = 0.10 + state["target_rpm"] = config["coating_rpm"] + + +def stop_coating(): + global state + global rotary + global timer1 + global timer2 + timer1.deinit() + timer2.deinit() + state["target_rpm"] = 0 + rotary.set(min_val=0, max_val=1, range_mode=RotaryIRQ.RANGE_BOUNDED, value=0) + state["view"] = start_view + + +def save_config(): + global config + with open("config.json", "w") as f: + json.dump(config, f) + + +# using default address 0x3c +i2c = I2C(1, sda=Pin(21), scl=Pin(22)) +display = ssd1306.SSD1306_I2C(128, 64, i2c) +display.rotate(0) + +timer1 = Timer(1) +timer2 = Timer(2) + + +splash() + +rotary = RotaryIRQ( + pin_num_clk=14, + pin_num_dt=13, + min_val=0, + max_val=1, + range_mode=RotaryIRQ.RANGE_BOUNDED, + pull_up=True, +) + +button = Pin(19, Pin.IN, Pin.PULL_UP) +button.irq(trigger=Pin.IRQ_FALLING, handler=on_button_press) + +uart = UART(1, baudrate=115200, rx=5) # to receive ESC telemetry + +state = { + "view": start_view, + "rpm": 0, + "target_rpm": 0, + "timer": 0, +} + +with open("config.json", "r") as f: + config = json.load(f) + +event_loop = uasyncio.get_event_loop() +event_loop.create_task(update_display()) +event_loop.create_task(update_motor()) +event_loop.run_forever() diff --git a/src/pid.py b/src/pid.py new file mode 100644 index 0000000..92ef81a --- /dev/null +++ b/src/pid.py @@ -0,0 +1,256 @@ +# adapted from https://github.com/m-lundberg/simple-pid + + +import time + + +def _clamp(value, limits): + lower, upper = limits + if value is None: + return None + elif (upper is not None) and (value > upper): + return upper + elif (lower is not None) and (value < lower): + return lower + return value + + +class PID(object): + """A simple PID controller.""" + + def __init__( + self, + Kp=1.0, + Ki=0.0, + Kd=0.0, + setpoint=0, + sample_time=0.01, + output_limits=(None, None), + auto_mode=True, + proportional_on_measurement=False, + differetial_on_measurement=True, + error_map=None, + ): + """ + Initialize a new PID controller. + + :param Kp: The value for the proportional gain Kp + :param Ki: The value for the integral gain Ki + :param Kd: The value for the derivative gain Kd + :param setpoint: The initial setpoint that the PID will try to achieve + :param sample_time: The time in seconds which the controller should wait before generating + a new output value. The PID works best when it is constantly called (eg. during a + loop), but with a sample time set so that the time difference between each update is + (close to) constant. If set to None, the PID will compute a new output value every time + it is called. + :param output_limits: The initial output limits to use, given as an iterable with 2 + elements, for example: (lower, upper). The output will never go below the lower limit + or above the upper limit. Either of the limits can also be set to None to have no limit + in that direction. Setting output limits also avoids integral windup, since the + integral term will never be allowed to grow outside of the limits. + :param auto_mode: Whether the controller should be enabled (auto mode) or not (manual mode) + :param proportional_on_measurement: Whether the proportional term should be calculated on + the input directly rather than on the error (which is the traditional way). Using + proportional-on-measurement avoids overshoot for some types of systems. + :param differetial_on_measurement: Whether the differential term should be calculated on + the input directly rather than on the error (which is the traditional way). + :param error_map: Function to transform the error value in another constrained value. + """ + self.Kp, self.Ki, self.Kd = Kp, Ki, Kd + self.setpoint = setpoint + self.sample_time = sample_time + + self._min_output, self._max_output = None, None + self._auto_mode = auto_mode + self.proportional_on_measurement = proportional_on_measurement + self.differetial_on_measurement = differetial_on_measurement + self.error_map = error_map + + self._proportional = 0 + self._integral = 0 + self._derivative = 0 + + self._last_time = None + self._last_output = None + self._last_error = None + self._last_input = None + + # try: + # # Get monotonic time to ensure that time deltas are always positive + # self.time_fn = time.monotonic + # except AttributeError: + # # time.monotonic() not available (using python < 3.3), fallback to time.time() + # self.time_fn = time.time + self.time_fn = lambda: 0.001 * time.ticks_ms() + + self.output_limits = output_limits + self.reset() + + def __call__(self, input_, dt=None): + """ + Update the PID controller. + + Call the PID controller with *input_* and calculate and return a control output if + sample_time seconds has passed since the last update. If no new output is calculated, + return the previous output instead (or None if no value has been calculated yet). + + :param dt: If set, uses this value for timestep instead of real time. This can be used in + simulations when simulation time is different from real time. + """ + if not self.auto_mode: + return self._last_output + + now = self.time_fn() + if dt is None: + dt = now - self._last_time if (now - self._last_time) else 1e-16 + elif dt <= 0: + raise ValueError('dt has negative value {}, must be positive'.format(dt)) + + if self.sample_time is not None and dt < self.sample_time and self._last_output is not None: + # Only update every sample_time seconds + return self._last_output + + # Compute error terms + error = self.setpoint - input_ + d_input = input_ - (self._last_input if (self._last_input is not None) else input_) + d_error = error - (self._last_error if (self._last_error is not None) else error) + + # Check if must map the error + if self.error_map is not None: + error = self.error_map(error) + + # Compute the proportional term + if not self.proportional_on_measurement: + # Regular proportional-on-error, simply set the proportional term + self._proportional = self.Kp * error + else: + # Add the proportional error on measurement to error_sum + self._proportional -= self.Kp * d_input + + # Compute integral and derivative terms + self._integral += self.Ki * error * dt + self._integral = _clamp(self._integral, self.output_limits) # Avoid integral windup + + if self.differetial_on_measurement: + self._derivative = -self.Kd * d_input / dt + else: + self._derivative = self.Kd * d_error / dt + + # Compute final output + output = self._proportional + self._integral + self._derivative + output = _clamp(output, self.output_limits) + + # Keep track of state + self._last_output = output + self._last_input = input_ + self._last_error = error + self._last_time = now + + return output + + def __repr__(self): + return ( + '{self.__class__.__name__}(' + 'Kp={self.Kp!r}, Ki={self.Ki!r}, Kd={self.Kd!r}, ' + 'setpoint={self.setpoint!r}, sample_time={self.sample_time!r}, ' + 'output_limits={self.output_limits!r}, auto_mode={self.auto_mode!r}, ' + 'proportional_on_measurement={self.proportional_on_measurement!r}, ' + 'differetial_on_measurement={self.differetial_on_measurement!r}, ' + 'error_map={self.error_map!r}' + ')' + ).format(self=self) + + @property + def components(self): + """ + The P-, I- and D-terms from the last computation as separate components as a tuple. Useful + for visualizing what the controller is doing or when tuning hard-to-tune systems. + """ + return self._proportional, self._integral, self._derivative + + @property + def tunings(self): + """The tunings used by the controller as a tuple: (Kp, Ki, Kd).""" + return self.Kp, self.Ki, self.Kd + + @tunings.setter + def tunings(self, tunings): + """Set the PID tunings.""" + self.Kp, self.Ki, self.Kd = tunings + + @property + def auto_mode(self): + """Whether the controller is currently enabled (in auto mode) or not.""" + return self._auto_mode + + @auto_mode.setter + def auto_mode(self, enabled): + """Enable or disable the PID controller.""" + self.set_auto_mode(enabled) + + def set_auto_mode(self, enabled, last_output=None): + """ + Enable or disable the PID controller, optionally setting the last output value. + + This is useful if some system has been manually controlled and if the PID should take over. + In that case, disable the PID by setting auto mode to False and later when the PID should + be turned back on, pass the last output variable (the control variable) and it will be set + as the starting I-term when the PID is set to auto mode. + + :param enabled: Whether auto mode should be enabled, True or False + :param last_output: The last output, or the control variable, that the PID should start + from when going from manual mode to auto mode. Has no effect if the PID is already in + auto mode. + """ + if enabled and not self._auto_mode: + # Switching from manual mode to auto, reset + self.reset() + + self._integral = last_output if (last_output is not None) else 0 + self._integral = _clamp(self._integral, self.output_limits) + + self._auto_mode = enabled + + @property + def output_limits(self): + """ + The current output limits as a 2-tuple: (lower, upper). + + See also the *output_limits* parameter in :meth:`PID.__init__`. + """ + return self._min_output, self._max_output + + @output_limits.setter + def output_limits(self, limits): + """Set the output limits.""" + if limits is None: + self._min_output, self._max_output = None, None + return + + min_output, max_output = limits + + if (None not in limits) and (max_output < min_output): + raise ValueError('lower limit must be less than upper limit') + + self._min_output = min_output + self._max_output = max_output + + self._integral = _clamp(self._integral, self.output_limits) + self._last_output = _clamp(self._last_output, self.output_limits) + + def reset(self): + """ + Reset the PID controller internals. + + This sets each term to 0 as well as clearing the integral, the last output and the last + input (derivative calculation). + """ + self._proportional = 0 + self._integral = 0 + self._derivative = 0 + + self._integral = _clamp(self._integral, self.output_limits) + + self._last_time = self.time_fn() + self._last_output = None + self._last_input = None diff --git a/src/rotary.py b/src/rotary.py new file mode 100644 index 0000000..0a566c2 --- /dev/null +++ b/src/rotary.py @@ -0,0 +1,170 @@ +# The MIT License (MIT) +# Copyright (c) 2022 Mike Teachman +# https://opensource.org/licenses/MIT + +# Platform-independent MicroPython code for the rotary encoder module + +# Documentation: +# https://github.com/MikeTeachman/micropython-rotary + +import micropython + +_DIR_CW = const(0x10) # Clockwise step +_DIR_CCW = const(0x20) # Counter-clockwise step + +# Rotary Encoder States +_R_START = const(0x0) +_R_CW_1 = const(0x1) +_R_CW_2 = const(0x2) +_R_CW_3 = const(0x3) +_R_CCW_1 = const(0x4) +_R_CCW_2 = const(0x5) +_R_CCW_3 = const(0x6) +_R_ILLEGAL = const(0x7) + +_transition_table = [ + + # |------------- NEXT STATE -------------| |CURRENT STATE| + # CLK/DT CLK/DT CLK/DT CLK/DT + # 00 01 10 11 + [_R_START, _R_CCW_1, _R_CW_1, _R_START], # _R_START + [_R_CW_2, _R_START, _R_CW_1, _R_START], # _R_CW_1 + [_R_CW_2, _R_CW_3, _R_CW_1, _R_START], # _R_CW_2 + [_R_CW_2, _R_CW_3, _R_START, _R_START | _DIR_CW], # _R_CW_3 + [_R_CCW_2, _R_CCW_1, _R_START, _R_START], # _R_CCW_1 + [_R_CCW_2, _R_CCW_1, _R_CCW_3, _R_START], # _R_CCW_2 + [_R_CCW_2, _R_START, _R_CCW_3, _R_START | _DIR_CCW], # _R_CCW_3 + [_R_START, _R_START, _R_START, _R_START]] # _R_ILLEGAL + +_transition_table_half_step = [ + [_R_CW_3, _R_CW_2, _R_CW_1, _R_START], + [_R_CW_3 | _DIR_CCW, _R_START, _R_CW_1, _R_START], + [_R_CW_3 | _DIR_CW, _R_CW_2, _R_START, _R_START], + [_R_CW_3, _R_CCW_2, _R_CCW_1, _R_START], + [_R_CW_3, _R_CW_2, _R_CCW_1, _R_START | _DIR_CW], + [_R_CW_3, _R_CCW_2, _R_CW_3, _R_START | _DIR_CCW], + [_R_START, _R_START, _R_START, _R_START], + [_R_START, _R_START, _R_START, _R_START]] + +_STATE_MASK = const(0x07) +_DIR_MASK = const(0x30) + + +def _wrap(value, incr, lower_bound, upper_bound): + range = upper_bound - lower_bound + 1 + value = value + incr + + if value < lower_bound: + value += range * ((lower_bound - value) // range + 1) + + return lower_bound + (value - lower_bound) % range + + +def _bound(value, incr, lower_bound, upper_bound): + return min(upper_bound, max(lower_bound, value + incr)) + + +def _trigger(rotary_instance): + for listener in rotary_instance._listener: + listener() + + +class Rotary(object): + + RANGE_UNBOUNDED = const(1) + RANGE_WRAP = const(2) + RANGE_BOUNDED = const(3) + + def __init__(self, min_val, max_val, reverse, range_mode, half_step, invert): + self._min_val = min_val + self._max_val = max_val + self._reverse = -1 if reverse else 1 + self._range_mode = range_mode + self._value = min_val + self._state = _R_START + self._half_step = half_step + self._invert = invert + self._listener = [] + + def set(self, value=None, min_val=None, + max_val=None, reverse=None, range_mode=None): + # disable DT and CLK pin interrupts + self._hal_disable_irq() + + if value is not None: + self._value = value + if min_val is not None: + self._min_val = min_val + if max_val is not None: + self._max_val = max_val + if reverse is not None: + self._reverse = -1 if reverse else 1 + if range_mode is not None: + self._range_mode = range_mode + self._state = _R_START + + # enable DT and CLK pin interrupts + self._hal_enable_irq() + + def value(self): + return self._value + + def reset(self): + self._value = 0 + + def close(self): + self._hal_close() + + def add_listener(self, l): + self._listener.append(l) + + def remove_listener(self, l): + if l not in self._listener: + raise ValueError('{} is not an installed listener'.format(l)) + self._listener.remove(l) + + def _process_rotary_pins(self, pin): + old_value = self._value + clk_dt_pins = (self._hal_get_clk_value() << + 1) | self._hal_get_dt_value() + + if self._invert: + clk_dt_pins = ~clk_dt_pins & 0x03 + + # Determine next state + if self._half_step: + self._state = _transition_table_half_step[self._state & + _STATE_MASK][clk_dt_pins] + else: + self._state = _transition_table[self._state & + _STATE_MASK][clk_dt_pins] + direction = self._state & _DIR_MASK + + incr = 0 + if direction == _DIR_CW: + incr = 1 + elif direction == _DIR_CCW: + incr = -1 + + incr *= self._reverse + + if self._range_mode == self.RANGE_WRAP: + self._value = _wrap( + self._value, + incr, + self._min_val, + self._max_val) + elif self._range_mode == self.RANGE_BOUNDED: + self._value = _bound( + self._value, + incr, + self._min_val, + self._max_val) + else: + self._value = self._value + incr + + try: + if old_value != self._value and len(self._listener) != 0: + micropython.schedule(_trigger, self) + except: + pass diff --git a/src/rotary_irq_esp.py b/src/rotary_irq_esp.py new file mode 100644 index 0000000..d123dbe --- /dev/null +++ b/src/rotary_irq_esp.py @@ -0,0 +1,76 @@ +# The MIT License (MIT) +# Copyright (c) 2020 Mike Teachman +# https://opensource.org/licenses/MIT + +# Platform-specific MicroPython code for the rotary encoder module +# ESP8266/ESP32 implementation + +# Documentation: +# https://github.com/MikeTeachman/micropython-rotary + +from machine import Pin +from rotary import Rotary +from sys import platform + +_esp8266_deny_pins = [16] + + +class RotaryIRQ(Rotary): + + def __init__(self, pin_num_clk, pin_num_dt, min_val=0, max_val=10, + reverse=False, range_mode=Rotary.RANGE_UNBOUNDED, pull_up=False, half_step=False, invert=False): + + if platform == 'esp8266': + if pin_num_clk in _esp8266_deny_pins: + raise ValueError( + '%s: Pin %d not allowed. Not Available for Interrupt: %s' % + (platform, pin_num_clk, _esp8266_deny_pins)) + if pin_num_dt in _esp8266_deny_pins: + raise ValueError( + '%s: Pin %d not allowed. Not Available for Interrupt: %s' % + (platform, pin_num_dt, _esp8266_deny_pins)) + + super().__init__(min_val, max_val, reverse, range_mode, half_step, invert) + + if pull_up == True: + self._pin_clk = Pin(pin_num_clk, Pin.IN, Pin.PULL_UP) + self._pin_dt = Pin(pin_num_dt, Pin.IN, Pin.PULL_UP) + else: + self._pin_clk = Pin(pin_num_clk, Pin.IN) + self._pin_dt = Pin(pin_num_dt, Pin.IN) + + self._enable_clk_irq(self._process_rotary_pins) + self._enable_dt_irq(self._process_rotary_pins) + + def _enable_clk_irq(self, callback=None): + self._pin_clk.irq( + trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, + handler=callback) + + def _enable_dt_irq(self, callback=None): + self._pin_dt.irq( + trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, + handler=callback) + + def _disable_clk_irq(self): + self._pin_clk.irq(handler=None) + + def _disable_dt_irq(self): + self._pin_dt.irq(handler=None) + + def _hal_get_clk_value(self): + return self._pin_clk.value() + + def _hal_get_dt_value(self): + return self._pin_dt.value() + + def _hal_enable_irq(self): + self._enable_clk_irq(self._process_rotary_pins) + self._enable_dt_irq(self._process_rotary_pins) + + def _hal_disable_irq(self): + self._disable_clk_irq() + self._disable_dt_irq() + + def _hal_close(self): + self._hal_disable_irq() diff --git a/src/ssd1306.py b/src/ssd1306.py new file mode 100644 index 0000000..a504cda --- /dev/null +++ b/src/ssd1306.py @@ -0,0 +1,163 @@ +# MicroPython SSD1306 OLED driver, I2C and SPI interfaces + +from micropython import const +import framebuf + + +# register definitions +SET_CONTRAST = const(0x81) +SET_ENTIRE_ON = const(0xA4) +SET_NORM_INV = const(0xA6) +SET_DISP = const(0xAE) +SET_MEM_ADDR = const(0x20) +SET_COL_ADDR = const(0x21) +SET_PAGE_ADDR = const(0x22) +SET_DISP_START_LINE = const(0x40) +SET_SEG_REMAP = const(0xA0) +SET_MUX_RATIO = const(0xA8) +SET_IREF_SELECT = const(0xAD) +SET_COM_OUT_DIR = const(0xC0) +SET_DISP_OFFSET = const(0xD3) +SET_COM_PIN_CFG = const(0xDA) +SET_DISP_CLK_DIV = const(0xD5) +SET_PRECHARGE = const(0xD9) +SET_VCOM_DESEL = const(0xDB) +SET_CHARGE_PUMP = const(0x8D) + +# Subclassing FrameBuffer provides support for graphics primitives +# http://docs.micropython.org/en/latest/pyboard/library/framebuf.html +class SSD1306(framebuf.FrameBuffer): + def __init__(self, width, height, external_vcc): + self.width = width + self.height = height + self.external_vcc = external_vcc + self.pages = self.height // 8 + self.buffer = bytearray(self.pages * self.width) + super().__init__(self.buffer, self.width, self.height, framebuf.MONO_VLSB) + self.init_display() + + def init_display(self): + for cmd in ( + SET_DISP, # display off + # address setting + SET_MEM_ADDR, + 0x00, # horizontal + # resolution and layout + SET_DISP_START_LINE, # start at line 0 + SET_SEG_REMAP | 0x01, # column addr 127 mapped to SEG0 + SET_MUX_RATIO, + self.height - 1, + SET_COM_OUT_DIR | 0x08, # scan from COM[N] to COM0 + SET_DISP_OFFSET, + 0x00, + SET_COM_PIN_CFG, + 0x02 if self.width > 2 * self.height else 0x12, + # timing and driving scheme + SET_DISP_CLK_DIV, + 0x80, + SET_PRECHARGE, + 0x22 if self.external_vcc else 0xF1, + SET_VCOM_DESEL, + 0x30, # 0.83*Vcc + # display + SET_CONTRAST, + 0xFF, # maximum + SET_ENTIRE_ON, # output follows RAM contents + SET_NORM_INV, # not inverted + SET_IREF_SELECT, + 0x30, # enable internal IREF during display on + # charge pump + SET_CHARGE_PUMP, + 0x10 if self.external_vcc else 0x14, + SET_DISP | 0x01, # display on + ): # on + self.write_cmd(cmd) + self.fill(0) + self.show() + + def poweroff(self): + self.write_cmd(SET_DISP) + + def poweron(self): + self.write_cmd(SET_DISP | 0x01) + + def contrast(self, contrast): + self.write_cmd(SET_CONTRAST) + self.write_cmd(contrast) + + def invert(self, invert): + self.write_cmd(SET_NORM_INV | (invert & 1)) + + def rotate(self, rotate): + self.write_cmd(SET_COM_OUT_DIR | ((rotate & 1) << 3)) + self.write_cmd(SET_SEG_REMAP | (rotate & 1)) + + def show(self): + x0 = 0 + x1 = self.width - 1 + if self.width != 128: + # narrow displays use centred columns + col_offset = (128 - self.width) // 2 + x0 += col_offset + x1 += col_offset + self.write_cmd(SET_COL_ADDR) + self.write_cmd(x0) + self.write_cmd(x1) + self.write_cmd(SET_PAGE_ADDR) + self.write_cmd(0) + self.write_cmd(self.pages - 1) + self.write_data(self.buffer) + + +class SSD1306_I2C(SSD1306): + def __init__(self, width, height, i2c, addr=0x3C, external_vcc=False): + self.i2c = i2c + self.addr = addr + self.temp = bytearray(2) + self.write_list = [b"\x40", None] # Co=0, D/C#=1 + super().__init__(width, height, external_vcc) + + def write_cmd(self, cmd): + self.temp[0] = 0x80 # Co=1, D/C#=0 + self.temp[1] = cmd + self.i2c.writeto(self.addr, self.temp) + + def write_data(self, buf): + self.write_list[1] = buf + self.i2c.writevto(self.addr, self.write_list) + + +class SSD1306_SPI(SSD1306): + def __init__(self, width, height, spi, dc, res, cs, external_vcc=False): + self.rate = 10 * 1024 * 1024 + dc.init(dc.OUT, value=0) + res.init(res.OUT, value=0) + cs.init(cs.OUT, value=1) + self.spi = spi + self.dc = dc + self.res = res + self.cs = cs + import time + + self.res(1) + time.sleep_ms(1) + self.res(0) + time.sleep_ms(10) + self.res(1) + super().__init__(width, height, external_vcc) + + def write_cmd(self, cmd): + self.spi.init(baudrate=self.rate, polarity=0, phase=0) + self.cs(1) + self.dc(0) + self.cs(0) + self.spi.write(bytearray([cmd])) + self.cs(1) + + def write_data(self, buf): + self.spi.init(baudrate=self.rate, polarity=0, phase=0) + self.cs(1) + self.dc(1) + self.cs(0) + self.spi.write(buf) + self.cs(1)