This commit is contained in:
Robert Schauklies 2025-07-12 20:12:18 +02:00
parent 5551bbda50
commit dd2a5bf637
8 changed files with 1087 additions and 0 deletions

0
src/boot.py Normal file
View file

10
src/config.json Normal file
View file

@ -0,0 +1,10 @@
{
"PID": {
"Kp": 1e-5,
"Ki": 1e-4,
"Kd": 0
},
"deposit_rpm": 1000,
"coating_rpm": 2000,
"coating_time": 10
}

113
src/dshot.py Normal file
View file

@ -0,0 +1,113 @@
import time
from esp32 import RMT
class Dshot:
def __init__(self, pin):
# https://brushlesswhoop.com/dshot-and-bidirectional-dshot/
# clock freq = 80 # Mhz
# clock divider = 7
# pulse time = 1 / (80 / 7) = 0.0875 us
# bit duration = 3.33 us
# T1H = 2.50 us
# T0H = 1.25 us
self._ON_PULSES = 57 # number of pulses for an ON bit
self._OFF_PULSES = 29 # number of pulses for an OFF bit
self._BIT_PULSES = 76 # total number of pulses per bit
self._PAUSE_PULSES = 21 # pause after sending packet
self.rmt = RMT(0, pin=pin, clock_div=7)
self._arm()
self._enable_telemetry()
def _arm(self, duration=2):
"""
Send arming sequence.
Args:
duration: How long to send the arming sequence, in seconds.
"""
t0 = time.time()
while time.time() - t0 < duration:
self.set_throttle(0)
# time.sleep(0.01)
def _enable_telemetry(self):
for i in range(6):
value = self._create_packet(33)
self._send(value)
for i in range(6):
value = self._create_packet(34)
self._send(value)
for i in range(6):
value = self._create_packet(35)
self._send(value)
for i in range(6):
value = self._create_packet(12)
self._send(value)
time.sleep(0.040)
# for i in range(10):
# self._send(self._create_packet(6))
# time.sleep(0.012)
# self._send(self._create_packet(42))
# self._send(self._create_packet(43))
# self._send(self._create_packet(44))
# self._send(self._create_packet(45))
# self._send(self._create_packet(46))
# self._send(self._create_packet(47))
# beep
# time.sleep(0.050)
# for i in range(5):
# self._send(self._create_packet(i + 1))
# time.sleep(0.250)
def _encode_throttle(self, value):
# 11 bit throttle: 2048 possible values.
# 0 is reserved for disarmed. 1-47 are reserved for special commands.
# Leaving 48 to 2047 (2000 steps) for the actual throttle value
value = min(max(value, 0.0), 1.0) # clamp to between 0 and 1
if value == 0:
value = 0 # disarmed
else:
value = int(value * 1999 + 48)
return value
def set_throttle(self, value: float, telemetry=True):
"""
Set throttle to a value between 0 and 1.
"""
value = self._encode_throttle(value)
value = self._create_packet(value)
self._send(value)
def _create_packet(self, value, telemetry=True):
# add telemetry bit
value = (value << 1) | telemetry
# add CRC (Cyclic Redundancy Check)
crc = (value ^ (value >> 4) ^ (value >> 8)) & 0x0F
value = (value << 4) | crc
return value
def _send(self, value):
"""
Send value to ESC.
"""
duration = []
for i in reversed(range(16)):
bit = (value & (2**i)) == 2**i # select bit
if bit == 1:
duration += [self._ON_PULSES, self._BIT_PULSES - self._ON_PULSES]
else:
duration += [self._OFF_PULSES, self._BIT_PULSES - self._OFF_PULSES]
duration[-1] += self._PAUSE_PULSES
self.rmt.write_pulses(duration, True)

299
src/main.py Normal file
View file

@ -0,0 +1,299 @@
from machine import Pin, I2C, Timer, UART
import uasyncio
import json
import ssd1306
from rotary_irq_esp import RotaryIRQ
from dshot import Dshot
from pid import PID
def splash():
# display.fill(0)
# display.fill_rect(0, 0, 32, 32, 1)
# display.fill_rect(2, 2, 28, 28, 0)
# display.vline(9, 8, 22, 1)
# display.vline(16, 2, 22, 1)
# display.vline(23, 8, 22, 1)
# display.fill_rect(26, 24, 2, 4, 1)
# display.text("MicroPython", 40, 0, 1)
# display.text("SSD1306", 40, 12, 1)
# display.text("OLED 128x64", 40, 24, 1)
display.fill(0)
display.text("SPIN", 34, 4, 1)
display.text("COATER", 50, 14, 1)
display.show()
def start_view(state, rotary):
display.text("SPIN", 34, 4, 1)
display.text("COATER", 50, 14, 1)
before = "> " if rotary.value() == 0 else " "
display.text(before + "Edit", 12, 36, 1)
before = "> " if rotary.value() == 1 else " "
display.text(before + "Start", 12, 46, 1)
def draw_edit_menu(state, rotary):
display.text("Deposit speed:", 0, 0, 1)
display.text("{: >{w}} RPM".format(config["deposit_rpm"], w=5), 56, 10, 1)
display.text("Coating speed:", 0, 21, 1)
display.text("{: >{w}} RPM".format(config["coating_rpm"], w=5), 56, 31, 1)
display.text("Coating time:", 0, 42, 1)
display.text("{: >{w}} sec".format(config["coating_time"], w=5), 56, 52, 1)
def edit_deposit_view(state, rotary):
config["deposit_rpm"] = rotary.value() * 100
draw_edit_menu(state, rotary)
display.text(">", 40, 10, 1)
def edit_coating_rpm_view(state, rotary):
config["coating_rpm"] = rotary.value() * 100
draw_edit_menu(state, rotary)
display.text(">", 40, 32, 1)
def edit_coating_time_view(state, rotary):
config["coating_time"] = rotary.value()
draw_edit_menu(state, rotary)
display.text(">", 40, 54, 1)
def draw_rpm(rpm):
display.text("RPM:{: >{w}.0f}".format(rpm, w=5), 30, 27, 1)
def deposit_view(state, rotary):
display.fill_rect(0, 0, 127, 14, 1)
display.text("Deposit", 36, 3, 0)
draw_rpm(state["rpm"])
display.text("Press to", 32, 42, 1)
display.text("continue", 32, 52, 1)
def coating_view(state, rotary):
display.fill_rect(0, 0, 127, 14, 1)
display.text("Coating", 36, 3, 0)
draw_rpm(state["rpm"])
display.text("{: >{w}} sec".format(state["timer"], w=4), 30, 48, 1)
def decode_ESC_telemetry(data, motor_poles=14):
if len(data) > 10:
# use latest telemetry
data = data[-10:]
temperature = int(data[0]) # degrees Celsius
voltage = int((data[1] << 8) | data[2]) * 0.01 # Volt
current = (
int((data[3] << 8) | data[4]) * 0.01
) # Amps, only available if the ESC has a current meter
consumption = int(
(data[5] << 8) | data[6]
) # mAh, only available if the ESC has a current meter
erpm = int((data[7] << 8) | data[8]) * 100
rpm = erpm / (motor_poles / 2)
crc = data[9]
# print(" Temp (C):", temperature)
# print(" Voltage (V):", voltage)
# print(" Current (A):", current)
# print("Consumption (mAh):", consumption)
# print(" Erpm:", erpm)
# print(" RPM:", rpm)
# print(" CRC:", crc)
# print()
return temperature, voltage, current, consumption, erpm, rpm
async def update_display():
global state
global rotary
while True:
display.fill(0)
state["view"](state, rotary)
display.show()
await uasyncio.sleep_ms(33)
async def update_motor():
global state
dshot = Dshot(pin=Pin(18))
rpm_pid = PID(
Kp=config["PID"]["Kp"],
Ki=config["PID"]["Ki"],
Kd=config["PID"]["Kd"],
setpoint=0,
sample_time=None,
output_limits=(0.0, 1.0),
# proportional_on_measurement=True,
)
while True:
rpm_pid.setpoint = state["target_rpm"]
# read ESC telemetry
if uart.any() >= 10:
telemetry = decode_ESC_telemetry(uart.read())
if telemetry is not None:
state["rpm"] = telemetry[5]
throttle = rpm_pid(state["rpm"])
# print(
# "Throttle:",
# throttle,
# "pid components:",
# rpm_pid.components,
# "RPM:",
# state["rpm"],
# )
if state["target_rpm"] == 0 and state["rpm"] < 1000:
throttle = 0
rpm_pid.reset()
dshot.set_throttle(throttle)
await uasyncio.sleep_ms(1)
def debounce_button(p):
p.irq(trigger=Pin.IRQ_FALLING, handler=None) # remove irq
timer0 = Timer(0)
timer0.init(period=20, mode=Timer.ONE_SHOT, callback=lambda t: on_button_press(p))
def on_button_press(p):
p.irq(trigger=Pin.IRQ_FALLING, handler=debounce_button) # restore irq
if p.value() == 1: # debounce
return
global state
global config
global rotary
if state["view"] == start_view:
if rotary.value() == 0:
state["view"] = edit_deposit_view
rotary.set(
min_val=0,
max_val=1000,
range_mode=RotaryIRQ.RANGE_BOUNDED,
value=int(0.01 * config["deposit_rpm"]),
)
return
if rotary.value() == 1:
state["view"] = deposit_view
state["target_rpm"] = config["deposit_rpm"]
return
if state["view"] == edit_deposit_view:
state["view"] = edit_coating_rpm_view
rotary.set(
min_val=0,
max_val=1000,
range_mode=RotaryIRQ.RANGE_BOUNDED,
value=int(0.01 * config["coating_rpm"]),
)
return
if state["view"] == edit_coating_rpm_view:
state["view"] = edit_coating_time_view
rotary.set(
min_val=0,
max_val=9999,
range_mode=RotaryIRQ.RANGE_BOUNDED,
value=config["coating_time"],
)
return
if state["view"] == edit_coating_time_view:
save_config()
rotary.set(min_val=0, max_val=1, range_mode=RotaryIRQ.RANGE_BOUNDED, value=0)
state["view"] = start_view
return
if state["view"] == deposit_view:
state["view"] = coating_view
start_coating(state)
return
if state["view"] == coating_view:
stop_coating()
return
def start_coating(state):
global timer1
global timer2
state["timer"] = config["coating_time"]
timer1.init(
period=config["coating_time"] * 1000,
mode=Timer.ONE_SHOT,
callback=lambda t: stop_coating(),
)
def decrement_timer(t):
state["timer"] -= 1
timer2.init(period=1000, mode=Timer.PERIODIC, callback=decrement_timer)
# state["throttle"] = 0.10
state["target_rpm"] = config["coating_rpm"]
def stop_coating():
global state
global rotary
global timer1
global timer2
timer1.deinit()
timer2.deinit()
state["target_rpm"] = 0
rotary.set(min_val=0, max_val=1, range_mode=RotaryIRQ.RANGE_BOUNDED, value=0)
state["view"] = start_view
def save_config():
global config
with open("config.json", "w") as f:
json.dump(config, f)
# using default address 0x3c
i2c = I2C(1, sda=Pin(21), scl=Pin(22))
display = ssd1306.SSD1306_I2C(128, 64, i2c)
display.rotate(0)
timer1 = Timer(1)
timer2 = Timer(2)
splash()
rotary = RotaryIRQ(
pin_num_clk=14,
pin_num_dt=13,
min_val=0,
max_val=1,
range_mode=RotaryIRQ.RANGE_BOUNDED,
pull_up=True,
)
button = Pin(19, Pin.IN, Pin.PULL_UP)
button.irq(trigger=Pin.IRQ_FALLING, handler=on_button_press)
uart = UART(1, baudrate=115200, rx=5) # to receive ESC telemetry
state = {
"view": start_view,
"rpm": 0,
"target_rpm": 0,
"timer": 0,
}
with open("config.json", "r") as f:
config = json.load(f)
event_loop = uasyncio.get_event_loop()
event_loop.create_task(update_display())
event_loop.create_task(update_motor())
event_loop.run_forever()

256
src/pid.py Normal file
View file

@ -0,0 +1,256 @@
# adapted from https://github.com/m-lundberg/simple-pid
import time
def _clamp(value, limits):
lower, upper = limits
if value is None:
return None
elif (upper is not None) and (value > upper):
return upper
elif (lower is not None) and (value < lower):
return lower
return value
class PID(object):
"""A simple PID controller."""
def __init__(
self,
Kp=1.0,
Ki=0.0,
Kd=0.0,
setpoint=0,
sample_time=0.01,
output_limits=(None, None),
auto_mode=True,
proportional_on_measurement=False,
differetial_on_measurement=True,
error_map=None,
):
"""
Initialize a new PID controller.
:param Kp: The value for the proportional gain Kp
:param Ki: The value for the integral gain Ki
:param Kd: The value for the derivative gain Kd
:param setpoint: The initial setpoint that the PID will try to achieve
:param sample_time: The time in seconds which the controller should wait before generating
a new output value. The PID works best when it is constantly called (eg. during a
loop), but with a sample time set so that the time difference between each update is
(close to) constant. If set to None, the PID will compute a new output value every time
it is called.
:param output_limits: The initial output limits to use, given as an iterable with 2
elements, for example: (lower, upper). The output will never go below the lower limit
or above the upper limit. Either of the limits can also be set to None to have no limit
in that direction. Setting output limits also avoids integral windup, since the
integral term will never be allowed to grow outside of the limits.
:param auto_mode: Whether the controller should be enabled (auto mode) or not (manual mode)
:param proportional_on_measurement: Whether the proportional term should be calculated on
the input directly rather than on the error (which is the traditional way). Using
proportional-on-measurement avoids overshoot for some types of systems.
:param differetial_on_measurement: Whether the differential term should be calculated on
the input directly rather than on the error (which is the traditional way).
:param error_map: Function to transform the error value in another constrained value.
"""
self.Kp, self.Ki, self.Kd = Kp, Ki, Kd
self.setpoint = setpoint
self.sample_time = sample_time
self._min_output, self._max_output = None, None
self._auto_mode = auto_mode
self.proportional_on_measurement = proportional_on_measurement
self.differetial_on_measurement = differetial_on_measurement
self.error_map = error_map
self._proportional = 0
self._integral = 0
self._derivative = 0
self._last_time = None
self._last_output = None
self._last_error = None
self._last_input = None
# try:
# # Get monotonic time to ensure that time deltas are always positive
# self.time_fn = time.monotonic
# except AttributeError:
# # time.monotonic() not available (using python < 3.3), fallback to time.time()
# self.time_fn = time.time
self.time_fn = lambda: 0.001 * time.ticks_ms()
self.output_limits = output_limits
self.reset()
def __call__(self, input_, dt=None):
"""
Update the PID controller.
Call the PID controller with *input_* and calculate and return a control output if
sample_time seconds has passed since the last update. If no new output is calculated,
return the previous output instead (or None if no value has been calculated yet).
:param dt: If set, uses this value for timestep instead of real time. This can be used in
simulations when simulation time is different from real time.
"""
if not self.auto_mode:
return self._last_output
now = self.time_fn()
if dt is None:
dt = now - self._last_time if (now - self._last_time) else 1e-16
elif dt <= 0:
raise ValueError('dt has negative value {}, must be positive'.format(dt))
if self.sample_time is not None and dt < self.sample_time and self._last_output is not None:
# Only update every sample_time seconds
return self._last_output
# Compute error terms
error = self.setpoint - input_
d_input = input_ - (self._last_input if (self._last_input is not None) else input_)
d_error = error - (self._last_error if (self._last_error is not None) else error)
# Check if must map the error
if self.error_map is not None:
error = self.error_map(error)
# Compute the proportional term
if not self.proportional_on_measurement:
# Regular proportional-on-error, simply set the proportional term
self._proportional = self.Kp * error
else:
# Add the proportional error on measurement to error_sum
self._proportional -= self.Kp * d_input
# Compute integral and derivative terms
self._integral += self.Ki * error * dt
self._integral = _clamp(self._integral, self.output_limits) # Avoid integral windup
if self.differetial_on_measurement:
self._derivative = -self.Kd * d_input / dt
else:
self._derivative = self.Kd * d_error / dt
# Compute final output
output = self._proportional + self._integral + self._derivative
output = _clamp(output, self.output_limits)
# Keep track of state
self._last_output = output
self._last_input = input_
self._last_error = error
self._last_time = now
return output
def __repr__(self):
return (
'{self.__class__.__name__}('
'Kp={self.Kp!r}, Ki={self.Ki!r}, Kd={self.Kd!r}, '
'setpoint={self.setpoint!r}, sample_time={self.sample_time!r}, '
'output_limits={self.output_limits!r}, auto_mode={self.auto_mode!r}, '
'proportional_on_measurement={self.proportional_on_measurement!r}, '
'differetial_on_measurement={self.differetial_on_measurement!r}, '
'error_map={self.error_map!r}'
')'
).format(self=self)
@property
def components(self):
"""
The P-, I- and D-terms from the last computation as separate components as a tuple. Useful
for visualizing what the controller is doing or when tuning hard-to-tune systems.
"""
return self._proportional, self._integral, self._derivative
@property
def tunings(self):
"""The tunings used by the controller as a tuple: (Kp, Ki, Kd)."""
return self.Kp, self.Ki, self.Kd
@tunings.setter
def tunings(self, tunings):
"""Set the PID tunings."""
self.Kp, self.Ki, self.Kd = tunings
@property
def auto_mode(self):
"""Whether the controller is currently enabled (in auto mode) or not."""
return self._auto_mode
@auto_mode.setter
def auto_mode(self, enabled):
"""Enable or disable the PID controller."""
self.set_auto_mode(enabled)
def set_auto_mode(self, enabled, last_output=None):
"""
Enable or disable the PID controller, optionally setting the last output value.
This is useful if some system has been manually controlled and if the PID should take over.
In that case, disable the PID by setting auto mode to False and later when the PID should
be turned back on, pass the last output variable (the control variable) and it will be set
as the starting I-term when the PID is set to auto mode.
:param enabled: Whether auto mode should be enabled, True or False
:param last_output: The last output, or the control variable, that the PID should start
from when going from manual mode to auto mode. Has no effect if the PID is already in
auto mode.
"""
if enabled and not self._auto_mode:
# Switching from manual mode to auto, reset
self.reset()
self._integral = last_output if (last_output is not None) else 0
self._integral = _clamp(self._integral, self.output_limits)
self._auto_mode = enabled
@property
def output_limits(self):
"""
The current output limits as a 2-tuple: (lower, upper).
See also the *output_limits* parameter in :meth:`PID.__init__`.
"""
return self._min_output, self._max_output
@output_limits.setter
def output_limits(self, limits):
"""Set the output limits."""
if limits is None:
self._min_output, self._max_output = None, None
return
min_output, max_output = limits
if (None not in limits) and (max_output < min_output):
raise ValueError('lower limit must be less than upper limit')
self._min_output = min_output
self._max_output = max_output
self._integral = _clamp(self._integral, self.output_limits)
self._last_output = _clamp(self._last_output, self.output_limits)
def reset(self):
"""
Reset the PID controller internals.
This sets each term to 0 as well as clearing the integral, the last output and the last
input (derivative calculation).
"""
self._proportional = 0
self._integral = 0
self._derivative = 0
self._integral = _clamp(self._integral, self.output_limits)
self._last_time = self.time_fn()
self._last_output = None
self._last_input = None

170
src/rotary.py Normal file
View file

@ -0,0 +1,170 @@
# The MIT License (MIT)
# Copyright (c) 2022 Mike Teachman
# https://opensource.org/licenses/MIT
# Platform-independent MicroPython code for the rotary encoder module
# Documentation:
# https://github.com/MikeTeachman/micropython-rotary
import micropython
_DIR_CW = const(0x10) # Clockwise step
_DIR_CCW = const(0x20) # Counter-clockwise step
# Rotary Encoder States
_R_START = const(0x0)
_R_CW_1 = const(0x1)
_R_CW_2 = const(0x2)
_R_CW_3 = const(0x3)
_R_CCW_1 = const(0x4)
_R_CCW_2 = const(0x5)
_R_CCW_3 = const(0x6)
_R_ILLEGAL = const(0x7)
_transition_table = [
# |------------- NEXT STATE -------------| |CURRENT STATE|
# CLK/DT CLK/DT CLK/DT CLK/DT
# 00 01 10 11
[_R_START, _R_CCW_1, _R_CW_1, _R_START], # _R_START
[_R_CW_2, _R_START, _R_CW_1, _R_START], # _R_CW_1
[_R_CW_2, _R_CW_3, _R_CW_1, _R_START], # _R_CW_2
[_R_CW_2, _R_CW_3, _R_START, _R_START | _DIR_CW], # _R_CW_3
[_R_CCW_2, _R_CCW_1, _R_START, _R_START], # _R_CCW_1
[_R_CCW_2, _R_CCW_1, _R_CCW_3, _R_START], # _R_CCW_2
[_R_CCW_2, _R_START, _R_CCW_3, _R_START | _DIR_CCW], # _R_CCW_3
[_R_START, _R_START, _R_START, _R_START]] # _R_ILLEGAL
_transition_table_half_step = [
[_R_CW_3, _R_CW_2, _R_CW_1, _R_START],
[_R_CW_3 | _DIR_CCW, _R_START, _R_CW_1, _R_START],
[_R_CW_3 | _DIR_CW, _R_CW_2, _R_START, _R_START],
[_R_CW_3, _R_CCW_2, _R_CCW_1, _R_START],
[_R_CW_3, _R_CW_2, _R_CCW_1, _R_START | _DIR_CW],
[_R_CW_3, _R_CCW_2, _R_CW_3, _R_START | _DIR_CCW],
[_R_START, _R_START, _R_START, _R_START],
[_R_START, _R_START, _R_START, _R_START]]
_STATE_MASK = const(0x07)
_DIR_MASK = const(0x30)
def _wrap(value, incr, lower_bound, upper_bound):
range = upper_bound - lower_bound + 1
value = value + incr
if value < lower_bound:
value += range * ((lower_bound - value) // range + 1)
return lower_bound + (value - lower_bound) % range
def _bound(value, incr, lower_bound, upper_bound):
return min(upper_bound, max(lower_bound, value + incr))
def _trigger(rotary_instance):
for listener in rotary_instance._listener:
listener()
class Rotary(object):
RANGE_UNBOUNDED = const(1)
RANGE_WRAP = const(2)
RANGE_BOUNDED = const(3)
def __init__(self, min_val, max_val, reverse, range_mode, half_step, invert):
self._min_val = min_val
self._max_val = max_val
self._reverse = -1 if reverse else 1
self._range_mode = range_mode
self._value = min_val
self._state = _R_START
self._half_step = half_step
self._invert = invert
self._listener = []
def set(self, value=None, min_val=None,
max_val=None, reverse=None, range_mode=None):
# disable DT and CLK pin interrupts
self._hal_disable_irq()
if value is not None:
self._value = value
if min_val is not None:
self._min_val = min_val
if max_val is not None:
self._max_val = max_val
if reverse is not None:
self._reverse = -1 if reverse else 1
if range_mode is not None:
self._range_mode = range_mode
self._state = _R_START
# enable DT and CLK pin interrupts
self._hal_enable_irq()
def value(self):
return self._value
def reset(self):
self._value = 0
def close(self):
self._hal_close()
def add_listener(self, l):
self._listener.append(l)
def remove_listener(self, l):
if l not in self._listener:
raise ValueError('{} is not an installed listener'.format(l))
self._listener.remove(l)
def _process_rotary_pins(self, pin):
old_value = self._value
clk_dt_pins = (self._hal_get_clk_value() <<
1) | self._hal_get_dt_value()
if self._invert:
clk_dt_pins = ~clk_dt_pins & 0x03
# Determine next state
if self._half_step:
self._state = _transition_table_half_step[self._state &
_STATE_MASK][clk_dt_pins]
else:
self._state = _transition_table[self._state &
_STATE_MASK][clk_dt_pins]
direction = self._state & _DIR_MASK
incr = 0
if direction == _DIR_CW:
incr = 1
elif direction == _DIR_CCW:
incr = -1
incr *= self._reverse
if self._range_mode == self.RANGE_WRAP:
self._value = _wrap(
self._value,
incr,
self._min_val,
self._max_val)
elif self._range_mode == self.RANGE_BOUNDED:
self._value = _bound(
self._value,
incr,
self._min_val,
self._max_val)
else:
self._value = self._value + incr
try:
if old_value != self._value and len(self._listener) != 0:
micropython.schedule(_trigger, self)
except:
pass

76
src/rotary_irq_esp.py Normal file
View file

@ -0,0 +1,76 @@
# The MIT License (MIT)
# Copyright (c) 2020 Mike Teachman
# https://opensource.org/licenses/MIT
# Platform-specific MicroPython code for the rotary encoder module
# ESP8266/ESP32 implementation
# Documentation:
# https://github.com/MikeTeachman/micropython-rotary
from machine import Pin
from rotary import Rotary
from sys import platform
_esp8266_deny_pins = [16]
class RotaryIRQ(Rotary):
def __init__(self, pin_num_clk, pin_num_dt, min_val=0, max_val=10,
reverse=False, range_mode=Rotary.RANGE_UNBOUNDED, pull_up=False, half_step=False, invert=False):
if platform == 'esp8266':
if pin_num_clk in _esp8266_deny_pins:
raise ValueError(
'%s: Pin %d not allowed. Not Available for Interrupt: %s' %
(platform, pin_num_clk, _esp8266_deny_pins))
if pin_num_dt in _esp8266_deny_pins:
raise ValueError(
'%s: Pin %d not allowed. Not Available for Interrupt: %s' %
(platform, pin_num_dt, _esp8266_deny_pins))
super().__init__(min_val, max_val, reverse, range_mode, half_step, invert)
if pull_up == True:
self._pin_clk = Pin(pin_num_clk, Pin.IN, Pin.PULL_UP)
self._pin_dt = Pin(pin_num_dt, Pin.IN, Pin.PULL_UP)
else:
self._pin_clk = Pin(pin_num_clk, Pin.IN)
self._pin_dt = Pin(pin_num_dt, Pin.IN)
self._enable_clk_irq(self._process_rotary_pins)
self._enable_dt_irq(self._process_rotary_pins)
def _enable_clk_irq(self, callback=None):
self._pin_clk.irq(
trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING,
handler=callback)
def _enable_dt_irq(self, callback=None):
self._pin_dt.irq(
trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING,
handler=callback)
def _disable_clk_irq(self):
self._pin_clk.irq(handler=None)
def _disable_dt_irq(self):
self._pin_dt.irq(handler=None)
def _hal_get_clk_value(self):
return self._pin_clk.value()
def _hal_get_dt_value(self):
return self._pin_dt.value()
def _hal_enable_irq(self):
self._enable_clk_irq(self._process_rotary_pins)
self._enable_dt_irq(self._process_rotary_pins)
def _hal_disable_irq(self):
self._disable_clk_irq()
self._disable_dt_irq()
def _hal_close(self):
self._hal_disable_irq()

163
src/ssd1306.py Normal file
View file

@ -0,0 +1,163 @@
# MicroPython SSD1306 OLED driver, I2C and SPI interfaces
from micropython import const
import framebuf
# register definitions
SET_CONTRAST = const(0x81)
SET_ENTIRE_ON = const(0xA4)
SET_NORM_INV = const(0xA6)
SET_DISP = const(0xAE)
SET_MEM_ADDR = const(0x20)
SET_COL_ADDR = const(0x21)
SET_PAGE_ADDR = const(0x22)
SET_DISP_START_LINE = const(0x40)
SET_SEG_REMAP = const(0xA0)
SET_MUX_RATIO = const(0xA8)
SET_IREF_SELECT = const(0xAD)
SET_COM_OUT_DIR = const(0xC0)
SET_DISP_OFFSET = const(0xD3)
SET_COM_PIN_CFG = const(0xDA)
SET_DISP_CLK_DIV = const(0xD5)
SET_PRECHARGE = const(0xD9)
SET_VCOM_DESEL = const(0xDB)
SET_CHARGE_PUMP = const(0x8D)
# Subclassing FrameBuffer provides support for graphics primitives
# http://docs.micropython.org/en/latest/pyboard/library/framebuf.html
class SSD1306(framebuf.FrameBuffer):
def __init__(self, width, height, external_vcc):
self.width = width
self.height = height
self.external_vcc = external_vcc
self.pages = self.height // 8
self.buffer = bytearray(self.pages * self.width)
super().__init__(self.buffer, self.width, self.height, framebuf.MONO_VLSB)
self.init_display()
def init_display(self):
for cmd in (
SET_DISP, # display off
# address setting
SET_MEM_ADDR,
0x00, # horizontal
# resolution and layout
SET_DISP_START_LINE, # start at line 0
SET_SEG_REMAP | 0x01, # column addr 127 mapped to SEG0
SET_MUX_RATIO,
self.height - 1,
SET_COM_OUT_DIR | 0x08, # scan from COM[N] to COM0
SET_DISP_OFFSET,
0x00,
SET_COM_PIN_CFG,
0x02 if self.width > 2 * self.height else 0x12,
# timing and driving scheme
SET_DISP_CLK_DIV,
0x80,
SET_PRECHARGE,
0x22 if self.external_vcc else 0xF1,
SET_VCOM_DESEL,
0x30, # 0.83*Vcc
# display
SET_CONTRAST,
0xFF, # maximum
SET_ENTIRE_ON, # output follows RAM contents
SET_NORM_INV, # not inverted
SET_IREF_SELECT,
0x30, # enable internal IREF during display on
# charge pump
SET_CHARGE_PUMP,
0x10 if self.external_vcc else 0x14,
SET_DISP | 0x01, # display on
): # on
self.write_cmd(cmd)
self.fill(0)
self.show()
def poweroff(self):
self.write_cmd(SET_DISP)
def poweron(self):
self.write_cmd(SET_DISP | 0x01)
def contrast(self, contrast):
self.write_cmd(SET_CONTRAST)
self.write_cmd(contrast)
def invert(self, invert):
self.write_cmd(SET_NORM_INV | (invert & 1))
def rotate(self, rotate):
self.write_cmd(SET_COM_OUT_DIR | ((rotate & 1) << 3))
self.write_cmd(SET_SEG_REMAP | (rotate & 1))
def show(self):
x0 = 0
x1 = self.width - 1
if self.width != 128:
# narrow displays use centred columns
col_offset = (128 - self.width) // 2
x0 += col_offset
x1 += col_offset
self.write_cmd(SET_COL_ADDR)
self.write_cmd(x0)
self.write_cmd(x1)
self.write_cmd(SET_PAGE_ADDR)
self.write_cmd(0)
self.write_cmd(self.pages - 1)
self.write_data(self.buffer)
class SSD1306_I2C(SSD1306):
def __init__(self, width, height, i2c, addr=0x3C, external_vcc=False):
self.i2c = i2c
self.addr = addr
self.temp = bytearray(2)
self.write_list = [b"\x40", None] # Co=0, D/C#=1
super().__init__(width, height, external_vcc)
def write_cmd(self, cmd):
self.temp[0] = 0x80 # Co=1, D/C#=0
self.temp[1] = cmd
self.i2c.writeto(self.addr, self.temp)
def write_data(self, buf):
self.write_list[1] = buf
self.i2c.writevto(self.addr, self.write_list)
class SSD1306_SPI(SSD1306):
def __init__(self, width, height, spi, dc, res, cs, external_vcc=False):
self.rate = 10 * 1024 * 1024
dc.init(dc.OUT, value=0)
res.init(res.OUT, value=0)
cs.init(cs.OUT, value=1)
self.spi = spi
self.dc = dc
self.res = res
self.cs = cs
import time
self.res(1)
time.sleep_ms(1)
self.res(0)
time.sleep_ms(10)
self.res(1)
super().__init__(width, height, external_vcc)
def write_cmd(self, cmd):
self.spi.init(baudrate=self.rate, polarity=0, phase=0)
self.cs(1)
self.dc(0)
self.cs(0)
self.spi.write(bytearray([cmd]))
self.cs(1)
def write_data(self, buf):
self.spi.init(baudrate=self.rate, polarity=0, phase=0)
self.cs(1)
self.dc(1)
self.cs(0)
self.spi.write(buf)
self.cs(1)