Stopky pre hasičské súťaže

Ako dobre zvláda Raspberry Pi časovo náročný zber dát cez GPIO? Poďme to zistiť.

3 years ago   •   11 min read

By Vladimír Záhradník

Raspberry Pi je cenovo veľmi dostupné malé zariadenie, ktoré viete použiť v rôznych projektoch. Má vynikajúcu podporu Linuxu a jeho oficiálna linuxová distribúcia, Raspbery Pi OS, beží na všetkých podporovaných Raspberry Pi.

Pokiaľ vyvíjate projekt na Raspberry Pi 3B+ a následne vložíte vašu microSD kartu do Raspberry Pi Zero, je možné, že váš kód bude stále fungovať. Jednou z veľkých výhod je 40-pinový GPIO konektor.

Vďaka knižniciam ako gpiozero dokážu aj začiatočníci používať GPIO bez komplikácií. Avšak Raspberry Pi nie je mikrokontrolér a Raspbery Pi OS nie je systém optimalizovaný na použitie v reálnom čase. Pokiaľ potrebujete vykonávať úlohy kritické na čas, môžete naraziť na problémy, podobne ako sme narazili aj my.

V tomto blogu sa pozrieme na vývoj stopiek pre hasičské súťaže a rozoberieme si niektoré problémy, ktorým sme čelili. Celý zdrojový kód nájdete tu.

Stopky pre hasičské súťaže

Náš projekt sme nazvali „Stopky pre hasičské súťaže.“ Pokiaľ netušíte, čo sa deje na takej hasičskej súťaži, pozrite si toto video.

Plán ukazujúci umiestnenie kontrolných stanovísk a meracích zariadení

Máme štyri kontrolné stanoviská. Keď hasiči prídu k stanovisku 4, stopky začnú merať čas. Keď prídu k stanovisku 3, aplikácia zmeria medzičas. Stanoviská 2 a 1 fungujú rovnako, akurát to stanovisko, ku ktorému sa dostanú hasiči ako k poslednému, zastaví hodiny.

Nemeriame iba čas. Ale zakaždým, keď hasiči prídu k stanovisku, zmeriame prietok vody, tlak v hadici, ako aj otáčky motora. Taktiež sme pridali podporu pre ručné meranie, ktoré sa spustí pomocou tlačidla, a výsledky zaznamenávame do CSV súboru.

Všetky výsledky zobrazujeme počas súťaže na externom monitore. Pre tento projekt sme sa rozhodli použiť Raspberry Pi. Je dostupné, nezaberá veľa miesta a má dostatok výkonu, aby zvládlo meranie aj zobrazovanie dát v rovnaký čas.

Používateľské rozhranie

Tento projekt sme sa rozhodli implementovať v Pythone 3. Tento jazyk je široko používaný a je vynikajúci na rýchle prototypovanie. Ako knižnicu pre grafické prostredie sme si vybrali Tkinter, ktorá je postavená nad jednoduchou knižnicou Tk. Rozhranie v konečnej podobe vyzerá takto:

Grafické prostredie pre hasičské stopky

Pokiaľ túto knižnicu nepoznáte, celé prostredie vytvárate pomocou kódu. Táto knižnica poskytuje viaceré widgety ako Frame (rám), Label (menovka) alebo Button (tlačidlo). Vzhľad widgetov môžete meniť vlastnými štýlmi a umiestňovať ich vnútri okna prostredníctvom manažéra geometrie. Ten najviac používaný manažér sa nazýva grid (mriežka). Každý widget potrebujete umiestniť do tabuľky so stĺpcami a riadkami. Najväčší widget umiestnený vnútri stĺpca alebo riadka určuje rozmery stĺpca alebo riadka.

A tu je náš prvý problém. Ak sa pozriete na UI, vidíte, že keď sa aplikácia spustí, nie sú v nej žiadne dáta. Widgety zaberajú menej miesta a okno je menšie. Hneď ako začnete merať hodnoty, veľkosť okna sa rýchlo zmení. Pozrite sa na snímok obrazovky nižšie.

Dynamická zmena veľkosti okna pri zmene obsahu

Môžete vidieť, ako je okno po spustení zmenšené a hneď ako začnete merať dáta, sa jeho veľkosť zmení. Tu to nepôsobí až tak rušivo, ale v tomto prípade sme namodelovali nerealistický scenár, kde sú všetky merané hodnoty rovnaké. Vo výsledku tak manažér geometrie zmení veľkosť okna iba raz.

No predstavme si scenár, ktorý nastane v skutočnosti. Snímané dáta sú rôzne a v rôznych rozsahoch. Keď zastavíte motor, nameráte mu nula otáčok. Avšak táto hodnota sa môže veľmi rýchlo vyšplhať až na 100 000 otáčok/minútu, pričom následne môže klesnúť pod 10 000 otáčok. V takom prípade zmení manažér geometrie veľkosť okna niekoľkokrát. Nepôsobí to veľmi príjemne a museli sme to riešiť.

Náš prvý prístup bol taký, že sme do widgetu vložili prázdne znaky a tým sme vyhradili určitý priestor. Náš problém tento prístup nevyriešil, ale bol to krok správnym smerom. Nakoniec sme sa rozhodli, že si pre každú meranú zložku potrebujeme určiť povolený rozsah hodnôt. Napríklad, povolíme otáčky motora v rozsahu 0 až 99 999. V skutočnom prostredí otáčky nikdy nedosiahnú najvyššiu hodnotu, ale takto môžeme požiadať widget o vyhradenie dostatku miesta, aby sa doňho vošli všetky zadané dáta za všetkých okolností.

label = ttk.Label(content_frame, style='Customized.Main.TLabel',
                  padding=(30, 10), width=10, anchor='center')
label.grid(column=1, row=9)
label.grid_remove()

Šírku widgetu môžete určiť pomocou parametra width. Viac informácií nájdete v dokumentácii. Text môžete umiestniť do stredu widgetu atribútom anchor. Predpokladáme, že vstup je menší ako najvyššia povolená hodnota. Teraz vyzerá UI oveľa príjemnejšie.

Ďalšia vec, ktorú sme potrebovali poriešiť, bola zmena viditeľnosti widgetu. Počas ručného merania zobrazuje rozhranie čas zachytenia dát a vedľa neho symbol M. Potom po dvoch sekundách je táto informácia skrytá. V tomto prípade nám prichádza na pomoc manažér geometrie!

label = ttk.Label(content_frame, style='Customized.Main.TLabel',
                  padding=(30, 10), width=10, anchor='center')

## Umiestnenie a zobrazenie widgetu na obrazovke
label.grid(column=1, row=9)

# Nastavenie widgetu ako neviditeľného
label.grid_remove()

# A nastavenie ho späť ako viditeľného
label.grid()

Všimnite si, že keď chceme nastaviť widget ako viditeľný, nepotrebujeme znovu zadávať pozíciu v mriežke. Manažér mriežky (grid manager) si pamätá posledné umiestnenie widgetu a okamžite ho zobrazí.

Architektúra

Naša aplikácia pozostáva z niekoľkých tried, ako môžete vidieť na diagrame nižšie.

Celková architektúra

Trieda Stopwatch (stopky) slúži na ovládanie hodín pri vstupe od používateľa a na spustenie merania. Trieda FlowMeter (merač prietoku) nepretržite meria prietok vody. Trieda PressureTransducer (snímač tlaku) meria tlak z dvoch pripojených senzorov, a trieda RpmMeter (merač otáčok) meria otáčky motora. Všetky tieto triedy posielajú správy do komponentu MainApp, ktorý obsluhuje slučku s udalosťami (event loop) a všetko spracovanie rieši na hlavnom UI vlákne. Pokiaľ skúsite aktualizovať UI widget z vlákna na pozadí, vaša aplikácia pravdepodobne spadne. V Androide to funguje veľmi podobne.

Spracovanie GPIO vstupov s knižnicou gpiozero

Gpiozero je malá knižnica, ktorá abstrahuje operácie nad GPIO. Pôvodne bola postavená nad knižnicou RPi.GPIO, ale časom vývojári pridali podporu aj pre iné knižnice. Umožňuje, okrem iného, použiť GPIO na diaľku vo vašom PC, ako keby mal GPIO konektor priamo na jeho doske.

Aby sme spracovali GPIO vstupy, použili sme komponent Button (tlačidlo). Tento komponent vytvára svoje vlastné vlákno na pozadí len aby mohlo spracovávať GPIO vstupy. Neustále zisťuje zmeny vstupu a keď dôjde k zmene, zavolá vašu funkciu. Komponent Button nepoužívame len na spracovanie stlačení tlačidla, ale pre akýkoľvek vstup signálu. Pamätajte, že tlačidlo je len abstrakcia a knižnica umožňuje nakonfigurovať vstupný pin tak, ako potrebujeme.

Nasledujúci kód ukazuje, ako viete tlačidlu priradiť vstupný pin a registrovať v ňom vašu funkciu ako callback:

_STOPWATCH_TRIGGER_PIN = 26

start_button = Button(self._STOPWATCH_TRIGGER_PIN, pull_up=True, bounce_time=0.1)
start_button.when_pressed = lambda: self._start_watch()

def _start_watch(self):
    # Kód, ktorý sa spustí
    pass

Váš callback potrebujete pripojiť do vlastnosti when_pressed. Tento komponent pre tlačidlo očakáva funkciu so špecifickými parametrami. Ideálne by ste mu mali dať odkaz na funkciu bez vstupného parametra alebo môžete odovzdať funkciu, ktorá očakáva jeden vstupný parameter. Keď bude zavolaná, bude mať správny odkaz na objekt Button.

Tlačidlá používame vnútri tried. Ich funkcie majú self ako vstupný parameter. Aby to mohlo fungovať, používame anonymnú lambda funkciu, ktorá zavolá potrebný kód.

S týmto prístupom sme mohli vybudovať naše komponenty od základu. Komponent pre stopky priamo obsluhuje vstupné signály na ovládanie hodín, RpmMeter sa spustí zakaždým, keď príjme impulz zo senzora v motore, atď.

Vzdialené GPIO

Gpiozero podporuje vzdialené GPIO. Nebudeme zachádzať do podrobností. Len vám povieme, že je to veľmi pohodlné a funguje to s Python kódom vo vašom počítači namiesto priamo v Raspberry Pi. Keď to raz skúsite, už o túto funkciu nechcete prísť. Svojím spôsobom to pripomína prácu s Arduino doskami na počítači. Hoci je Raspberry Pi dostatočne výkonné na mnoho úloh, vývoj kódu na oveľa výkonnejšom počítači šetrí čas.

Ale keď skúsite spustiť skripty napísané pre Raspberry Pi na počítači, niektoré knižnice takéto prostredie neočakávajú a vyhodia výnimku. Preto sme problematický kód vložili do „try…except“ bloku a funkcie, ktoré takéto knižnice používajú, sme vypli.

Jedným príkladom je I2C komunikácia pomocou knižnice Busio z balíka CircuitPython. Pretože táto knižnica na počítači nepobeží, cez I2C nedostaneme dáta z tlakového senzora, ale všetko ostatné funguje bez problémov.

Tento prístup nám umožnil odladiť a vyriešiť väčšinu problémov na počítači a stráviť vývojom priamo na Raspberry Pi iba potrebný čas.

Adafruit CircuitPython

CircuitPython je odnož MicroPythonu od firmy Adafruit. Ide o verziu Pythonu pre mikrokontroléry. Predtým, keď ste si kúpili senzor od Adafruitu, poskytli vám knižnice a návody pre plnohodnotnú verziu Pythonu, ako aj pre MicroPython. Od tohto prístupu upustili, pretože kód sa horšie udržiava. Čo namiesto toho Adafruit odporúča, je použiť ich vrstvu pre kompatibilitu, ktorá sa nazýva Blinka. Pomocou tejto knižnice môžete použiť ich kód napísaný pre CircuitPython, a toto je prístup, ktorý sme si vybrali aj my.

Slučka s udalosťami (Event Loop)

Keď sa vyskytne nejaká udalosť, napr. je stlačená spúšť hodín, komponent, v tomto prípade, StopWatch, spracuje túto udalosť a oznámi to komponentu MainApp zavolaním funkcie MainApp#post_on_ui_thread(event). MainApp obsluhuje slučku s udalosťami, ktorá spracováva všetky udalosti a obnovuje UI podľa potreby. Len pripomíname, že UI je potrebné obsluhovať len cez UI vlákno. Vnútorne používame triedu pre synchronizovaný front (Queue).

# Front pre UI vlákno na aktualizáciu komponentov
self._thread_queue = queue.Queue()

Dáta do frontu môžete pridať takto:

def post_on_ui_thread(self, value):
    self._thread_queue.put(value)

A takto ich môžete čítať počas obnovy UI:

def _update_ui(self):
    try:
        event = self._thread_queue.get(False)

        # Udalosti bez dát
        if type(event) == str:
            if event == StopWatch.STOPWATCH_RESET:
                # Resetuj hodiny
                pass
            if event == StopWatch.MANUAL_MEASURE_ENDED:
                # Urob nejakú prácu
                pass

        # Udalosti s dátami v slovníkoch (kľúč = hodnota)
        elif type(event) == dict:
            checkpoint = None

            for eventKey, eventValue in event.items():
                if eventKey == StopWatch.SPLIT_TIME_MEASURED:
                    # Zobraz na obrazovke medzičas
                    pass

    except queue.Empty:
        pass

    # Tento riadok sa postará o to, že funkcia pre aktualizáciu sa bude volať pravidelne,
    # aby spracovávala udalosti z frontu. SCREEN_REFRESH_MS nám umožňuje presne kontrolovať,
    # ako často sa táto funkcia bude volať.
    self._parent.after(self._SCREEN_REFRESH_MS, self._update_ui)

Spracovanie dát zo senzorov

Senzory pripojené k Raspberry Pi nám neposkytujú hodnoty, ktoré by sme mohli priamo použiť. Namiesto toho si musíme vypočítať, čo potrebujeme. Postupne ako prichádzajú vzorky, ich ukladáme do cyklického frontu. Na tento účel používame triedu deque z modulu collections.

Napríklad, ak chcete uložiť posledných desať vzoriek podľa toho, ako prišli, inicializujte triedu deque následovne:

self._samples = deque(maxlen=10)

Keď príde nová vzorka, pridá sa na koniec frontu, čím sa zahodí najstaršia vzorka.

self._samples.append(time.time())

S týmto frontom pracujete ako so štandardným zoznamom, ktorý má desať prvkov. Krása!

Výpočet otáčok za minútu z impulzov

Náš senzor sa nachádza priamo v motore. Generuje impulz zakaždým, keď sviečka hodí iskru. Na GPIO pine dostávame rýchle impulzy. Čím rýchlejšie sú, tým vyššie sú otáčky. Zachytávame čas (v milisekundách) príchodu každej vzorky. Odtiaľ vieme vypočítať časový rozdiel medzi vzorkami a následne vypočítať frekvenciu. A nakoniec vieme odvodiť z tejto frekvencie počet otáčok za minútu.

freq = 1 / ((self._samples[-1] - self._samples[0]) / self._MAX_QUEUE_LENGTH) / self._k_multiplier
    rpm = int(freq * 60)

V tejto chvíli počítame časový rozdiel medzi desiatimi vzorkami. Dokonalá vzorkovacia presnosť na Raspberry Pi nie je zaručená a tento prístup nám pomôže aspoň čiastočne tento problém vyriešiť.

Výpočet prietoku vody z impulzov

Svojím spôsobom je to podobné s predchádzajúcim prípadom, pretože taktiež na vstupe dostávame impulzy. Aby sme vypočítali prietok v litroch za sekundu, potrebujeme len inú rovnicu.

freq = 1 / ((self._samples[-1] - self._samples[0]) / self._MAX_QUEUE_LENGTH)
    lpm = int(self._k * (freq + self._q))

Výpočet tlaku

Získanie údajov o tlaku je zložitejšie. Tlak môže veľmi rýchlo narásť aj klesnúť. Preto potrebujeme vypočítať jeho kĺzavý priemer.

Implementácia pre tento modul sa líši. Senzor, ktorý používame, komunikuje po zbernici I2C. V jeho vlákne nepoužívame komponent Button. Namiesto toho potrebujeme vytvoriť naše vlastné vlákno na pozadí, ktoré bude periodicky zbierať zo senzora nové vzorky. V tomto vlákne beží nasledujúci kód.

def _update_sliding_avg_pressure_thread(self):
    if self._i2c_initialized:
        self._is_measuring = True
        self._voltage_1_samples.append(self._adc_channels[0].voltage)
        self._voltage_2_samples.append(self._adc_channels[1].voltage)
        self._is_measuring = False

Keď neskôr budeme chcieť vedieť tlak, vypočítame jeho hodnotu následovne:

def get_sliding_avg_pressure(self):
    # Kĺzavý priemer sa vypočíta z _MAX_QUEUE_LENGTH vzoriek
    avg_p1 = sum(self._voltage_1_samples) / self._avg_samples_no
    avg_p2 = sum(self._voltage_2_samples) / self._avg_samples_no
    return tuple(map(self._calculate_pressure_from_input_value, [avg_p1, avg_p2]))

Konfiguračný súbor

Keď sa pozriete bližšie, uvidíte, že v našich vzorcoch používame rôzne používateľom definované konštanty. Tie sú definované v konfiguračnom súbore, ktorý sa načíta pri inicializácii aplikácie. Tento konfiguračný súbor nám dáva flexibilitu, ktorú potrebujeme. V našom kóde sme si zadefinovali záložné predvolené hodnoty.

Problémy

A teraz k tej zábavnej časti! Niektoré z tých menej významných problémov sme už spomenuli. Tie sme už vyriešili. Poďme sa však teraz v krátkosti pozrieť na problémy, ktoré sme vyriešili len čiastočne.

Otáčky motora kolíšu pri ustálenom toku impulzov

Keď generujeme impulzy so špecifickou frekvenciou, vypočítané otáčky za minútu by sa nemali meniť. V skutočnosti tomu tak však nie je. Počas testovania naša hodnota otáčok za minútu kolísala v rozsahu +-100, čo znamená, že ak motor beží na 10 000 otáčkach, uvidíte hodnoty v rozsahu 9 950 až 10 050. Nakoniec sme dospeli k názoru, že pre tento typ projektu je takáto presnosť prijateľná. To však neznamená, že sme s tým spokojní.

Prietok vody taktiež kolíše

Problém veľmi podobný tomu vyššie. Akurát, že prietok vody kolíše v menšom rozsahu.

Zmena tlaku sa neprejaví okamžite

Zmena napätia sa neprejavila okamžitou zmenou tlaku. Asi by sme mohli viac odladiť výpočet tlaku. Zmena veľkosti cyklického frontu by mohla byť efektívnym riešením. Avšak v našom prípade je to takto prijateľné.

Protokolovanie (logovanie) značne spomaľuje zber dát

V našom kóde vo veľkom používame modul logging. Na začiatku sme nastavili, aby zobrazoval všetky správy s prioritou DEBUG a vyššou. Ukázalo sa, že je to problém, ak sa logovanie vyskytuje vo vláknach, ktoré vzorkujú dáta. Nakoniec sme veci urýchlili tým, že sme nastavili náš logger, aby vypisoval iba upozornenia a chybové správy.

Riešenie

Keď sa pozrieme na problémy, vidíme istý vzorec. Presnosť vzorkovania dáť je ovplyvnená, pretože nepoužívame hardvér, ktorý bol navrhnutý na prácu v reálnom čase. Taktiež Raspbian nemá linuxové jadro odladené na prácu v reálnom čase. Všetky vlákna riadi plánovač v linuxovom jadre. Nemáte záruku, že vlákno na pozadí bude bežať na vyhradenom jadre procesora po celý čas, bez prerušenia. Toto je ten dôvod, prečo nedostávame vzorky dát v pravidelných intervaloch.

Občas Raspberry vzorky nezachytí alebo zaznamená čas ich príchodu s miernym posunom. Keď pracujete s relatívne vyššími frekvenciami, už na tom záleží. Takéto malé rozdiely spôsobujú kolísanie otáčok, ktoré nie je zanedbateľné.

Keby sme v budúcnosti začali pracovať na podobnom projekte ako je tento, asi by sme zvážili použiť vyhradený mikrokontrolér len na to, aby zbieral všetky vzorky v presnom čase a počítal cieľové hodnoty v čase jeho nečinnosti.

Arduino mikrokontroléry vyzerajú byť ako správna voľba. Podporujú prerušenia a preto môžeme pripojiť rutinu, ktorá bude spracovávať vzorky s najvyššou prioritou. A keď sa práve nespracováva žiadna vzorka, tento mikrokontrolér má stále dostatok strojového času, aby robil všetky tie výpočty. Koniec-koncov, nebeží na ňom žiadny operačný systém.

Keď Arduino modul zozbiera dáta, môže pravidelne posielať súhrnné správy na Raspberry Pi, ktoré zobrazí obsah na obrazovke. Aj napriek zopár zádrhelom považujeme stále Raspberry Pi za perfektný nástroj na obsluhu takýchto jednoduchých UI.

Záver

Možnosť mať GPIO na Raspberry Pi je skvelá. Môžete pracovať na mnohých problémoch bez toho, aby ste narazili na problémy ako sme mali my. Ponaučenie z tohto príbehu je také, že ak potrebujete najvyššiu presnosť, riešte to na hardvéri, ktorý bol na tento účel navrhnutý.

Pracovali ste už niekedy na projekte ako je tento? Narazili ste na niektoré zo spomenutých problémov? A ako ste ich vyriešili? Napíšte nám to do komentára.

Spread the word

Keep reading