180 lines
6.6 KiB
Python
180 lines
6.6 KiB
Python
from sh1106 import SH1106_I2C
|
|
from rotary_irq_esp import RotaryIRQ
|
|
|
|
import uasyncio as asyncio
|
|
from time import ticks_ms, ticks_add, ticks_diff
|
|
|
|
|
|
from homie.constants import BOOLEAN, TRUE, FALSE, FLOAT
|
|
from homie.property import HomieProperty
|
|
from homie.node import HomieNode
|
|
from homie.device import await_ready_state
|
|
from machine import Pin
|
|
from primitives.pushbutton import Pushbutton
|
|
|
|
|
|
class PlantBoxControlPanel(HomieNode):
|
|
|
|
def __init__(self, id, name, waterlevel_sensor, bmp280,
|
|
i2c, pin_clk, pin_dt, pin_sw):
|
|
super().__init__(id=id, name=name, type="controller")
|
|
self.i2c = i2c
|
|
self.bmp280 = bmp280
|
|
self.display = SH1106_I2C(
|
|
i2c=self.i2c, width=128, height=64)
|
|
self.display.rotate(True, False)
|
|
|
|
self.rotary_encoder = RotaryIRQ(
|
|
pin_num_clk=int(str(pin_clk)[4:][:-1]),
|
|
pin_num_dt=int(str(pin_dt)[4:][:-1]),
|
|
min_val=0,
|
|
max_val=3,
|
|
reverse=False,
|
|
pull_up=True,
|
|
range_mode=RotaryIRQ.RANGE_WRAP)
|
|
|
|
self.waterlevel_sensor = waterlevel_sensor
|
|
pin_sw.init(mode=Pin.IN, pull=Pin.PULL_UP)
|
|
self.button = Pushbutton(pin_sw, suppress=True, sense=1)
|
|
|
|
self._screen_on = False
|
|
self.needs_redraw = False
|
|
self.update_interval = 0.3
|
|
|
|
self.property_button_pressed = HomieProperty(
|
|
id="button_pressed",
|
|
name="Knöpgen gedrückt",
|
|
settable=True,
|
|
default=FALSE,
|
|
on_message=self._on_button_pressed_msg,
|
|
datatype=BOOLEAN,
|
|
)
|
|
self.add_property(self.property_button_pressed)
|
|
|
|
self.property_screen_on = HomieProperty(
|
|
id="screen_on",
|
|
name="Bildschrim angeschaltet",
|
|
settable=True,
|
|
default=FALSE,
|
|
on_message=self.on_screen_on_msg,
|
|
datatype=BOOLEAN,
|
|
)
|
|
self.add_property(self.property_screen_on)
|
|
|
|
self.screen_timeout = 30
|
|
self.screen_timeout_ticks = 0
|
|
self.property_screen_timeout = HomieProperty(
|
|
id="screen_timeout",
|
|
name="Bildschrimtimeout",
|
|
settable=True,
|
|
default=str(self.screen_timeout),
|
|
on_message=self.on_screen_timeout_msg,
|
|
datatype=FLOAT,
|
|
)
|
|
self.add_property(self.property_screen_timeout)
|
|
|
|
self.on_button_pressed = None
|
|
self.on_button_released = None
|
|
asyncio.create_task(self._update_data_async())
|
|
|
|
def _on_button_pressed_msg(self, topic, payload, retained):
|
|
if {FALSE: False, TRUE: True}[payload] and not self.button.rawstate():
|
|
self._on_button_pressed()
|
|
self._on_button_released()
|
|
|
|
def _on_button_pressed(self):
|
|
self.property_button_pressed.value = TRUE
|
|
self.needs_redraw = True
|
|
if self.on_button_pressed:
|
|
self.on_button_pressed()
|
|
|
|
def _on_button_released(self):
|
|
self.property_button_pressed.value = FALSE
|
|
self._set_screen_on(True)
|
|
self.needs_redraw = True
|
|
if self.on_button_released:
|
|
self.on_button_released()
|
|
|
|
def on_screen_on_msg(self, topic, payload, retained):
|
|
self._set_screen_on({FALSE: False, TRUE: True}[payload])
|
|
|
|
def on_screen_timeout_msg(self, topic, payload, retained):
|
|
new_screen_timeout = float(payload)
|
|
self.screen_timeout_ticks = \
|
|
ticks_add(
|
|
ticks_diff(
|
|
self.screen_timeout_ticks,
|
|
int(self.screen_timeout * 1000)),
|
|
int(new_screen_timeout * 1000))
|
|
self.screen_timeout = new_screen_timeout
|
|
|
|
def reset_screen_timout(self):
|
|
self.screen_timeout_ticks = ticks_add(
|
|
ticks_ms(),
|
|
int(self.screen_timeout * 1000))
|
|
|
|
# @await_ready_state
|
|
async def _update_data_async(self):
|
|
self.button.press_func(self._on_button_pressed)
|
|
self.button.release_func(self._on_button_released)
|
|
rotary_last = 0
|
|
last_temp = 0
|
|
while True:
|
|
if ticks_diff(self.screen_timeout_ticks, ticks_ms()) <= 0:
|
|
self._set_screen_on(False)
|
|
if self._screen_on:
|
|
if rotary_last != self.rotary_encoder.value():
|
|
rotary_last = self.rotary_encoder.value()
|
|
self.needs_redraw = True
|
|
if last_temp != self.bmp280.temperature:
|
|
self.needs_redraw = True
|
|
last_temp = self.bmp280.temperature
|
|
if self.needs_redraw:
|
|
self.needs_redraw = False
|
|
# self.display.fill(0)
|
|
# self.display.text(str(self.rotary_encoder.value()), 0, 20)
|
|
# self.display.text(str(self.button.rawstate()), 0, 30)
|
|
self.display.fill_rect(0, 0, 128, 10, 0)
|
|
self.display.text("{:1.2f}°C".format(self.bmp280.temperature), 1, 1)
|
|
actions = [("Einst.", None), ("Wasser", lambda : self._set_screen_on(False))]
|
|
self.rotary_encoder.set(max_val=len(actions) - 1)
|
|
for i, (a, br) in enumerate(actions):
|
|
selected = i == self.rotary_encoder.value()
|
|
color = 0 if selected else 1
|
|
screen_dif = int(128 / len(actions))
|
|
self.display.fill_rect(
|
|
screen_dif * i+1, 50, screen_dif-2, 10, 1-color)
|
|
self.display.text(a, screen_dif*i+2, 51, color)
|
|
if not self.screen_just_turned_on:
|
|
if selected:
|
|
self.on_button_released = br
|
|
self.display.show()
|
|
self.screen_just_turned_on = False
|
|
else:
|
|
self.rotary_encoder.set(value=0)
|
|
self.on_button_released, self.on_button_released = None, None
|
|
await asyncio.sleep_ms(int(self.update_interval*1000.0))
|
|
|
|
@property
|
|
def screen_on(self):
|
|
return self._screen_on
|
|
|
|
@screen_on.setter
|
|
def set_screen_on(self, on):
|
|
self._set_screen_on(on)
|
|
|
|
def _set_screen_on(self, on):
|
|
if on == self._screen_on:
|
|
if on:
|
|
self.reset_screen_timout()
|
|
else:
|
|
self._screen_on = on
|
|
if on:
|
|
# self.display.poweron()
|
|
self.reset_screen_timout()
|
|
self.needs_redraw = True
|
|
self.screen_just_turned_on = True
|
|
else:
|
|
self.display.poweroff()
|
|
self.property_screen_on = TRUE if on else FALSE
|