以前作った,エアコン室外機冷却システムを Kubernetes ベースで作り直してみました.
モチベーション
昨年作ったシステムは,期待通りの働きはしてくれたものの以下の課題がありました.
- Raspberry Pi にソフトを書き込まないと動作確認できない
- ソフトの更新作業が繁雑で更新中動作が停止してしまう
- 稼働状況が分かりにくい
これらを,次のようにして改善します.
- Kubernetes クラスタを前提とした構成にソフトを再構築
- 電磁弁を制御するコンポーネントと制御量を算出するコンポーネントを分離
- Web インターフェース作成
改善後のシステム構成
改善後のシステム構成は以下.
Raspberry Pi は右側の四角部分になり,冷却に関わる制御量を受け取って電磁弁を制御するアクチュエータの機能を担います.分析用に噴霧した水量のモニタも行います.ソフトは Raspberry Pi に直接書き込むのではなく,Kubernetes でノード指定をした Deployment で配置します.
冷却に関わる制御量コントローラ,同じく Kuberbetes を使って別のノードに配置しています.
コード全体は下記においています.
以降では,先ほどの3 つのポイントを順に説明します.
Kubernetes クラスタを前提としたソフト構成
こちらで必要になる要素を下記の3つのエントリーで説明しています.これらを組み合わせることで,Git にコミットするだけで,自動的にソフトを更新して公開できるようになります.
アクチュエータについては,RPI.GPIO
や流量計の動きを模擬する下記のようなクラスを追加し,GitLab CI 等での自動テストを可能にしています.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
if os.environ.get("DUMMY_MODE", None) is None: import RPi.GPIO as GPIO import fd_q10c else: logging.warning("Using dummy GPIO") # NOTE: 本物の GPIO のように振る舞うダミーのライブラリ class GPIO: IS_DUMMY = True BCM = 0 OUT = 0 state = 0 def setmode(mode): return def setup(gpio, direction): return def output(gpio, value): GPIO.state = value return def input(gpio): return GPIO.state def setwarnings(warnings): return class fd_q10c: def sense(force_power_on=True): if GPIO.state == VALVE_STATE.OPEN.value: return 1.23 else: return 0 def stop(): return |
コンポーネントの分離
電磁弁の制御を担う Raspberry Pi と制御量を算出するサーバは,ZeroMQ を使った Pub-Sub パターンで分離しました.
RPC的な仕組みを使おうか迷ったのですが,ZeroMQ を使った Pub-Sub パターンだと下記のようなことが,異常系を考慮することなく簡単に実現できるので良かったです.
- アクチュエータ,コントローラの起動順番の変更.片方の再起動.
- これにより,ソフトをデプロイする際にのシステム停止時間が最小化されます.コントローラを更新する場合,その間冷却動作は問題なく継続できます.
- アクチュエータ側の複数配置
- これにより,アクチュエータのデプロイ前,実際に稼働しているコントローラと接続して,テストを行うことができます.GitLab CI を設定すれば,テストに成功した場合のみ自動的にデプロイすることもできます.
実際のコードはこんな感じです.異常処理が一切無い簡潔なコードですが,これで問題なく安定稼働しています.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
import zmq import json import logging import time CH = "unit_cooler" def start_server(server_port, func, interval_sec): logging.info("Start control server (port: {port})...".format(port=server_port)) context = zmq.Context() socket = context.socket(zmq.PUB) socket.bind("tcp://*:{port}".format(port=server_port)) logging.info("Server initialize done.") while True: start_time = time.time() socket.send_string("{ch} {json_str}".format(ch=CH, json_str=json.dumps(func()))) sleep_sec = interval_sec - (time.time() - start_time) logging.debug("Seep {sleep_sec:.1f} sec...".format(sleep_sec=sleep_sec)) time.sleep(sleep_sec) def start_client(server_host, server_port, func): logging.info("Start control client...") context = zmq.Context() socket = context.socket(zmq.SUB) target = "tcp://{host}:{port}".format(host=server_host, port=server_port) socket.connect(target) socket.setsockopt_string(zmq.SUBSCRIBE, CH) logging.info("Client initialize done.") while True: ch, json_str = socket.recv_string().split(" ", 1) json_data = json.loads(json_str) logging.debug("recv {json}".format(json=json_data)) func(json_data) |
Web インターフェース作成
下記を表示できるようにしています.
- 散布した水量
- 冷却モード
- 散布パターンを決めるのに使っているセンサーデータ(エアコン電力,気象条件)
- 作動ログログ
画面はこんな感じ.
動いている様子を下記で見てもらえます.センサーデータ等は1週間前の値になっています.
基本的に Web インターフェースは制御量算出の入出力を表示しているだけですが,「本日の散水量」だけは追加で準備しました.実は昨年は,散布量と水道料金のバランスを取るのに少し苦労したので,改善するのが目的です.
電磁弁を制御する Raspberry Pi で流量をモニタし,これを Fluentd 経由で InfluxDB に格納しておくことで実現しています.
InfluxDB に入っているデータは,Flux と呼ばれるクエリ言語を使って集計して取り出せます.aggregateWindow
と reduce
を使って,1分ごとの流量平均値を計算した上で積算することで,散布量が算出できます.
コードはこんな感じです.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
FLUX_SUM_QUERY = """ from(bucket: "{bucket}") |> range(start: {start}, stop: {stop}) |> filter(fn:(r) => r._measurement == "{measure}") |> filter(fn: (r) => r.hostname == "{hostname}") |> filter(fn: (r) => r["_field"] == "{field}") |> aggregateWindow(every: {every}m, offset:-{every}m, fn: mean, createEmpty: {create_empty}) |> filter(fn: (r) => exists r._value) |> fill(usePrevious: true) |> reduce( fn: (r, accumulator) => ({{sum: r._value + accumulator.sum, count: accumulator.count + 1}}), identity: {{sum: 0.0, count: 0}}, ) """ def get_day_sum(config, measure, hostname, field, offset_day=0): try: every_min = 1 window_min = 5 now = datetime.datetime.now() start = "-{offset_day}d{hour}h{minute}m".format( offset_day=offset_day, hour=now.hour, minute=now.minute ) stop = "-{offset_day}d".format(offset_day=offset_day) table_list = fetch_data_impl( config, FLUX_SUM_QUERY, measure, hostname, field, start, stop, every_min, window_min, True, ) value_list = table_list.to_values(columns=["count", "sum"]) if len(value_list) == 0: return 0 else: return value_list[0][1] except: logging.warning(traceback.format_exc()) return 0 |
コメント