Stopky pro hasičské soutěže

Jak dobře zvládá Raspberry Pi časově náročný sběr dat přes GPIO? Pojďme to zjistit.

3 years ago   •   12 min read

By Vladimír Záhradník

Raspberry Pi je cenově velmi dostupné malé zařízení, které umíte využít v různých projektech. Má vynikající podporu Linuxu a jeho oficiální linuxová distribuce, Raspbery Pi OS, běží na všech podporovaných Raspberry Pi.

Pokud vyvíjíte projekt na Raspberry Pi 3B+ a následně vložíte vaši microSD kartu do Raspberry Pi Zero, je možné, že váš kód bude stále fungovat. Jednou z velkých výhod je 40-pinový GPIO konektor.

Díky knihovnám jako gpiozero dokáží i začátečníci používat GPIO bez komplikací. Ovšem Raspberry Pi není mikrokontrolér a Raspbery Pi OS není systém optimalizovaný pro použití v reálném čase. Pokud potřebujete provádět úkoly kritické na čas, můžete narazit na problémy, podobně jako jsme narazili i my.

V tomto blogu se podíváme na vývoj stopek pro hasičské soutěže a rozebereme si některé problémy, kterým jsme čelili. Celý zdrojový kód najdete zde.

Stopky pro hasičské soutěže

Náš projekt jsme pojmenovali „Stopky pro hasičské soutěže.“ Pokud netušíte, co se děje na takové hasičské soutěži, podívejte se na toto video.

Plán ukazující umístění kontrolních stanovišť a meřících zařízení

Máme čtyři kontrolní stanoviště. Když hasiči přijdou k stanovišti 4, stopky začnou měřit čas. Když přijdou k stanovišti 3, aplikace změří mezičas. Stanoviště 2 a 1 fungují stejně, akorát to stanoviště, ke kterému se dostanou hasiči jako k poslednímu, zastaví hodiny.

Neměříme pouze čas. Ale pokaždé, když hasiči přijdou ke stanovišti, změříme průtok vody, tlak v hadici, jakož i otáčky motoru. Také jsme přidali podporu pro ruční měření, které se spustí pomocí tlačítka, a výsledky zaznamenáváme do CSV souboru.

Všechny výsledky zobrazujeme během soutěže na externím monitoru. Pro tento projekt jsme se rozhodli použít Raspberry Pi. Je dostupné, nezabírá mnoho místa a má dostatek výkonu, aby zvládlo měření i zobrazování dat ve stejný čas.

Uživatelské rozhraní

Tento projekt jsme se rozhodli implementovat v Pythonu 3. Tento jazyk je široce používán a je vynikající pro rychlé prototypování. Jako knihovnu pro grafické prostředí jsme si vybrali Tkinter, která je postavena nad jednoduchou knihovnou Tk. Rozhraní v konečné podobě vypadá takto:

Grafické prostředí pro hasičské stopky

Pokud tuto knihovnu neznáte, celé prostředí vytváříte pomocí kódu. Tato knihovna poskytuje několik widgetů jako Frame (rám), Label (jmenovka) nebo Button (tlačítko). Vzhled widgetů můžete měnit vlastními styly a umisťovat je uvnitř okna prostřednictvím manažera geometrie. Ten nejvíce používaný manažer se nazývá grid (mřížka). Každý widget potřebujete umístit do tabulky se sloupci a řádky. Největší widget umístěn uvnitř sloupce nebo řádku určuje rozměry sloupce nebo řádku.

A tady je náš první problém. Pokud se podíváte na UI, vidíte, že když se aplikace spustí, nejsou v ní žádná data. Widgety zabírají méně místa a okno je menší. Jakmile začnete měřit hodnoty, velikost okna se rychle změní. Podívejte se na snímek obrazovky níže.

Dynamická změna velikosti okna při změně obsahu

Můžete vidět, jak je okno po spuštění zmenšené a jakmile začnete měřit data, se jeho velikost změní. Tady to nepůsobí až tak rušivě, ale v tomto případě jsme namodelovali nerealistický scénář, kde jsou všechny měřené hodnoty stejné. Ve výsledku tak manažer geometrie změní velikost okna pouze jednou.

No představme si scénář, který nastane ve skutečnosti. Snímané data jsou různé a v různých rozsazích. Když zastavíte motor, naměříte mu nula otáček. Ovšem tato hodnota se může velmi rychle vyšplhat až na 100 000 otáček/minutu, přičemž následně může klesnout pod 10 000 otáček. V takovém případě změní manažer geometrie velikost okna několikrát. Nepůsobí to velmi příjemně a museli jsme to řešit.

Náš první přístup byl takový, že jsme do widgetu vložili prázdné znaky a tím jsme vyhradili určitý prostor. Náš problém tento přístup nevyřešil, ale byl to krok správným směrem. Nakonec jsme se rozhodli, že si pro každou měřenou složku potřebujeme určit povolený rozsah hodnot. Například, povolíme otáčky motoru v rozsahu 0 až 99 999. Ve skutečném prostředí otáčky nikdy nedosáhnou nejvyšší hodnotu, ale takto můžeme požádat widget o vyhrazení dostatku místa, aby se do něj vešly všechny zadané data za všech okolností.

label = ttk.Label(content_frame, style='Customized.Main.TLabel',
                  padding=(30, 10), width=10, anchor='center')
label.grid(column=1, row=9)
label.grid_remove()

Šířku widgetu můžete určit pomocí parametru width. Více informací naleznete v dokumentaci. Text můžete umístit do středu widgetu atributem anchor. Předpokládáme, že vstup je menší než nejvyšší povolená hodnota. Nyní vypadá UI mnohem příjemnější.

Další věc, kterou jsme potřebovali dořešit, byla změna viditelnosti widgetu. Během ručního měření zobrazuje rozhraní čas zachycení dat a vedle něj symbol M. Potom po dvou sekundách je tato informace skrytá. V tomto případě nám přichází na pomoc manažer geometrie!

label = ttk.Label(content_frame, style='Customized.Main.TLabel',
                  padding=(30, 10), width=10, anchor='center')

## Umístění a zobrazení widgetu na obrazovce
label.grid(column=1, row=9)

# Nastavení widgetu jako neviditelného
label.grid_remove()

# A nastavení zpátky jako viditelného
label.grid()

Všimněte si, že když chceme nastavit widget jako viditelný, nepotřebujeme znovu zadávat pozici v mřížce. Manažer mřížky (grid manager) si pamatuje poslední umístění widgetu a okamžitě jej zobrazí.

Architektura

Naše aplikace je sestavena z několika tříd, jak můžete vidět na diagramu níže.

Celková architektura

Třída Stopwatch (stopky) slouží k ovládání hodin při vstupu od uživatele a ke spuštění měření. Třída FlowMeter (měřič průtoku) nepřetržitě měří průtok vody. Třída PressureTransducer (snímač tlaku) měří tlak ze dvou připojených senzorů, a třídaRpmMeter (měřič otáček) měří otáčky motoru. Všechny tyto třídy posílají zprávy do komponenty MainApp, který obsluhuje smyčku s událostmi (event loop) a vše zpracování řeší na hlavním UI vlákně. Pokud zkusíte aktualizovat UI widget z vlákna na pozadí, vaše aplikace pravděpodobně spadne. V Androidu to funguje velmi podobně.

Zpracování GPIO vstupů s knihovnou gpiozero

Gpiozero je malá knihovna, která abstrahuje operace nad GPIO. Původně byla postavena nad knihovnou RPi.GPIO, ale časem vývojáři přidali podporu i pro jiné knihovny. Umožňuje mimo jiné použít GPIO na dálku ve vašem PC, jako kdyby měl GPIO konektor přímo na jeho desce.

Abychom zpracovali GPIO vstupy, použili jsme komponentu Button (tlačítko). Tato komponenta vytváří své vlastní vlákno na pozadí jen aby mohlo zpracovávat GPIO vstupy. Neustále zjišťuje změny vstupu a když dojde ke změně, zavolá vaši funkci. Komponentu Button nepoužíváme pouze na zpracování stisknutí tlačítka, ale pro jakýkoliv vstup signálu. Pamatujte, že tlačítko je jen abstrakce a knihovna umožňuje nakonfigurovat vstupní pin tak, jak potřebujeme.

Následující kód ukazuje, jak můžete tlačítku přiřadit vstupní pin a registrovat v něm vaši funkci jako callback:

_STOPWATCH_TRIGGER_PIN = 26

start_button = Button(self._STOPWATCH_TRIGGER_PIN, pull_up=True, bounce_time=0.1)
start_button.when_pressed = lambda: self._start_watch()

def _start_watch(self):
    # Kód, který se spustí
    pass

Váš callback potřebujete připojit do vlastnosti when_pressed. Tato komponenta pro tlačítko očekává funkci se specifickými parametry. Ideálně byste jí měli dát odkaz na funkci bez vstupního parametru nebo můžete odevzdat funkci, která očekává jeden vstupní parametr. Když bude zavolána, bude mít správný odkaz na objekt Button.

Tlačítka používáme uvnitř tříd. Jejich funkce mají self jako vstupní parametr. Aby to mohlo fungovat, používáme anonymní lambda funkci, která zavolá potřebný kód.

S tímto přístupem jsme mohli vybudovat naše komponenty od základu. Komponenta pro stopky přímo obsluhuje vstupní signály na ovládání hodin, RpmMeter se spustí pokaždé, když přijme impuls ze senzoru v motoru atd.

Vzdálené GPIO

Gpiozero podporuje vzdálené GPIO. Nebudeme zacházet do podrobností. Jen vám řekneme, že je to velmi pohodlné a funguje to s Python kódem v počítači namísto přímo v Raspberry Pi. Když to jednou zkusíte, už o tuto funkci nechcete přijít. Svým způsobem to připomíná práci s Arduino deskami na počítači. Přestože je Raspberry Pi dostatečně výkonné na mnoho úkolů, vývoj kódu na mnohem výkonnějším počítači šetří čas.

Ale když zkusíte spustit skripty napsané pro Raspberry Pi na počítači, některé knihovny takové prostředí neočekávají a vyhodí výjimku. Proto jsme problematický kód vložili do „try…except“ bloku a funkce, které takové knihovny používají, jsme vypnuli.

Jedním příkladem je I2C komunikace pomocí knihovny Busio z balíku CircuitPython. Protože tato knihovna na počítači nepoběží, přes I2C nedostaneme data z tlakového senzoru, ale všechno ostatní funguje bez problémů.

Tento přístup nám umožnil odladit a vyřešit většinu problémů na počítači a strávit vývojem přímo na Raspberry Pi pouze potřebný čas.

Adafruit CircuitPython

CircuitPython je odnož MicroPythonu od firmy Adafruit. Jde o verzi Pythonu pro mikrokontroléry. Předtím, když jste si koupili senzor od Adafruitu, poskytli vám knihovny a návody pro plnohodnotnou verzi Pythonu, tak pro MicroPython. Od tohoto přístupu upustili, protože kód se hůře udržuje. Co místo toho Adafruit doporučuje je použít jejich vrstvu pro kompatibilitu, která se nazývá Blinka. Pomocí této knihovny můžete použít jejich kód napsaný pro CircuitPython, a toto je přístup, který jsme si vybrali i my.

Smyčka s událostmi (Event Loop)

Když se vyskytne nějaká událost, např. je stisknuta spoušť hodin, komponenta, v tomto případě, StopWatch, zpracuje tuto událost a oznámí to komponentě MainApp zavoláním funkce MainApp#post_on_ui_thread(event). MainApp obsluhuje smyčku s událostmi, která zpracovává všechny události a obnovuje UI dle potřeby. Jen připomínáme, že UI je třeba obsluhovat jen přes UI vlákno. Vnitřně používáme třídu pro synchronizovaný front (Queue).

# Fronta pro UI vlákno na aktualizaci komponent
self._thread_queue = queue.Queue()

Data do fronty můžete přidat takto:

def post_on_ui_thread(self, value):
    self._thread_queue.put(value)

A takto je můžete číst během obnovy UI:

def _update_ui(self):
    try:
        event = self._thread_queue.get(False)

        # Události bez dat
        if type(event) == str:
            if event == StopWatch.STOPWATCH_RESET:
                # Resetuj hodiny
                pass
            if event == StopWatch.MANUAL_MEASURE_ENDED:
                # Udělej nějakou práci
                pass

        # Události s daty ve slovnících (klíč = hodnota)
        elif type(event) == dict:
            checkpoint = None

            for eventKey, eventValue in event.items():
                if eventKey == StopWatch.SPLIT_TIME_MEASURED:
                    # Zobraz na obrazovce mezičas
                    pass

    except queue.Empty:
        pass

    # Tento řádek se postará o to, že funkce pro aktualizaci se bude volat pravidelně,
    # aby zpracovávala události z fronty. SCREEN_REFRESH_MS nám umožňuje přesně kontrolovat,
    # jak často se tato funkce bude volat.
    self._parent.after(self._SCREEN_REFRESH_MS, self._update_ui)

Zpracování dat ze senzorů

Senzory připojené k Raspberry Pi nám neposkytují hodnoty, které bychom mohli přímo použít. Namísto toho si musíme vypočítat, co potřebujeme. Postupně jak přicházejí vzorky, je ukládáme do cyklické fronty. Pro tento účel používáme třídu deque z modulu collections.

Například, pokud chcete uložit posledních deset vzorků podle toho, jak přišli, inicializujte třídu deque následovně:

self._samples = deque(maxlen=10)

Když přijde nový vzorek, přidá se na konec fronty, čímž se zahodí nejstarší vzorek.

self._samples.append(time.time())

S touhle frontou pracujete jako se standardním seznamem, který má deset prvků. Krása!

Výpočet otáček za minutu z impulsů

Náš senzor se nachází přímo v motoru. Generuje impuls pokaždé, když svíčka hodí jiskru. Na GPIO pinu dostáváme rychlé impulsy. Čím rychlejší jsou, tím vyšší jsou otáčky. Zachycujeme čas (v milisekundách) příchodu každého vzorku. Odtud umíme vypočítat časový rozdíl mezi vzorky a následně vypočítat frekvenci. A nakonec umíme odvodit z této frekvence počet otáček za minutu.

freq = 1 / ((self._samples[-1] - self._samples[0]) / self._MAX_QUEUE_LENGTH) / self._k_multiplier
    rpm = int(freq * 60)

V této chvíli počítáme časový rozdíl mezi deseti vzorky. Dokonalá vzorkovací přesnost na Raspberry Pi není zaručena a tento přístup nám pomůže alespoň částečně tento problém vyřešit.

Výpočet průtoku vody z impulsů

Svým způsobem je to podobné s předchozím případem, protože také na vstupu dostáváme impulzy. Abychom vypočítali průtok v litrech za sekundu, potřebujeme jen jinou rovnici.

freq = 1 / ((self._samples[-1] - self._samples[0]) / self._MAX_QUEUE_LENGTH)
    lpm = int(self._k * (freq + self._q))

Výpočet tlaku

Získání údajů o tlaku je složitější. Tlak může velmi rychle narůst i klesnout. Proto potřebujeme vypočítat jeho klouzavý průměr.

Implementace pro tento modul se liší. Senzor, který používáme, komunikuje po sběrnici I2C. V jeho vlákně nepoužíváme komponentu Button. Namísto toho potřebujeme vytvořit naše vlastní vlákno na pozadí, které bude periodicky sbírat ze senzoru nové vzorky. V tomto vlákně běží následující kód.

def _update_sliding_avg_pressure_thread(self):
    if self._i2c_initialized:
        self._is_measuring = True
        self._voltage_1_samples.append(self._adc_channels[0].voltage)
        self._voltage_2_samples.append(self._adc_channels[1].voltage)
        self._is_measuring = False

Když později budeme chtít zjistit tlak, vypočítáme jeho hodnotu následovně:

def get_sliding_avg_pressure(self):
    # Kĺzavý priemer sa vypočíta z _MAX_QUEUE_LENGTH vzoriek
    avg_p1 = sum(self._voltage_1_samples) / self._avg_samples_no
    avg_p2 = sum(self._voltage_2_samples) / self._avg_samples_no
    return tuple(map(self._calculate_pressure_from_input_value, [avg_p1, avg_p2]))

Konfigurační soubor

Když se podíváte blíže, uvidíte, že v našich vzorcích používáme různé uživatelem definované konstanty. Ty jsou definovány v konfiguračním souboru, který se načte při inicializaci aplikace. Tento konfigurační soubor nám dává flexibilitu, kterou potřebujeme. V našem kódu jsme si definovali záložní výchozí hodnoty.

Problémy

A teď k té zábavné části! Některé z těch méně významných problémů jsme již zmínili. Ty jsme už vyřešili. Pojďme se však nyní v krátkosti podívat na problémy, které jsme vyřešili jen částečně.

Otáčky motoru kolísají při ustáleném toku impulsů

Když generujeme impulzy se specifickou frekvencí, vypočítané otáčky za minutu by se neměly měnit. Ve skutečnosti tomu tak ovšem není. Během testování naše hodnota otáček za minutu kolísala v rozsahu +-100, což znamená, že pokud motor běží na 10 000 otáčkách, uvidíte hodnoty v rozsahu 9 950 až 10 050. Nakonec jsme dospěli k názoru, že pro tento typ projektu je taková přesnost přijatelná. To však neznamená, že jsme s tím spokojeni.

Průtok vody také kolísá

Problém velmi podobný tomu výše. Akorát, že průtok vody kolísá v menším rozsahu.

Změna tlaku se neprojeví okamžitě

Změna napětí se neprojevila okamžitou změnou tlaku. Asi bychom mohli více odladit výpočet tlaku. Změna velikosti cyklické fronty by mohla být efektivním řešením. Ovšem v našem případě je to takto přijatelné.

Protokolování (logování) značně zpomaluje sběr dát

V našem kódu ve velkém používáme modul logging. Na začátku jsme nastavili, aby zobrazoval všechny zprávy s prioritou DEBUG a vyšší. Ukázalo se, že je to problém, pokud se logování vyskytuje ve vláknech, které vzorkují data. Nakonec jsme věci urychlili tím, že jsme nastavili náš logger, aby vypisoval pouze upozornění a chybové zprávy.

Řešení

Když se podíváme na problémy, vidíme jistý vzorec. Přesnost vzorkování dát je ovlivněna, protože nepoužíváme hardware, který byl navržen pro práci v reálném čase. Také Raspbery Pi OS nemá linuxové jádro odladěné na práci v reálném čase. Všechna vlákna řídí plánovač v linuxovém jádře. Nemáte záruku, že vlákno na pozadí bude běžet na vyhrazeném jádru procesoru po celou dobu, bez přerušení. Toto je ten důvod, proč nedostáváme vzorky dat v pravidelných intervalech.

Občas Raspberry vzorky nezachytí nebo zaznamená čas jejich příjezdu s mírným posunem. Když pracujete s relativně vyššími frekvencemi, už na tom záleží. Takové malé rozdíly způsobují kolísání otáček, které není zanedbatelné.

Kdybychom v budoucnu začali pracovat na podobném projektu jako je tento, asi bychom zvážili použít vyhrazený mikrokontrolér pouze na to, aby sbíral všechny vzorky v přesném čase a počítal cílové hodnoty v době jeho nečinnosti.

Arduino mikrokontroléry vypadají být jako správná volba. Podporují přerušení a proto můžeme připojit rutinu, která bude zpracovávat vzorky s nejvyšší prioritou. A když se právě nezpracovává žádný vzorek, tento mikrokontrolér má stále dostatek strojového času, aby dělal všechny ty výpočty. Konec-konců, neběží na něm žádný operační systém.

Když Arduino modul shromáždí data, může pravidelně posílat souhrnné zprávy na Raspberry Pi, které zobrazí obsah na obrazovce. I přes pár zádrhelů považujeme stále Raspberry Pi za perfektní nástroj k obsluze takových jednoduchých UI.

Závěr

Možnost mít GPIO na Raspberry Pi je skvělá. Můžete pracovat na mnoha problémech, aniž jste narazili na problémy jako jsme měli my. Ponaučení z tohoto příběhu je takové, že pokud potřebujete nejvyšší přesnost, řešte to na hardwaru, který byl pro tento účel navržen.

Pracovali jste již někdy na projektu jako je tento? Narazili jste na některé ze zmíněných problémů? A jak jste je vyřešili? Napište nám to do komentáře.

Spread the word

Keep reading