前回からの続きです.実際にスマートメータに無線で接続します.
使用機材
スマートメータとは Wi-SUN という無線通信規格で通信を行う必要があります.ここを見ると対応モジュールはいくつかあるようですが,個人が入手できるものはロームの BP35Cx Series 位しかありません.
というわけで,以下を入手します.
- BP35A1
-
アンテナ内蔵の Wi-SUM モジュールです.ホストとは UART で通信を行います.
- BP35A7A
-
インターフェース基板です.
上記のモジュールには,B to B のコネクタしか載っていないので,これを使ってブレッドボードや基板に実装しやすい形に変換します.
- BP35A7-ACCESSORIES
-
BP35A1 と BP35A7A を固定するためのアクセサリです.
B to B のコネクタを傷めないためにもあった方がよいです.
ちなみに,現時点だとみんな BP35A1 を使っているので,ネット検索するといろんな接続事例が見つけられます.
参考資料
- ECHONET Lite 通信ミドルウェア仕様 [キャッシュ]
- スマートメータとの通信に使用する ECHONET Lite プロトコルの通信仕様が規定されています.
「第3章 電文構成(フレームフォーマット)」に目を通すと,全体像がつかめます.プロトコルとしては,機器が持っているオブジェクトのプロパティを読み書きする仕組みになっています. - ECHONET 機器オブジェクト詳細規定 [キャッシュ]
- 機器の種類毎に,対応するオブジェクトが対応しているプロパティが規定されています.
スマートメータについては「低圧スマート電力量メータ」として規定されています. - BP35A1 コマンドリファレンスマニュアル [キャッシュ]
- BP35A1 との間の通信仕様が規定されています.
メータとの間の通信を確立した後,SKSENDTO および ERXUDP を使って ECHONET Lite プロトコルに従った通信を行います.
回路
BP35A1 と UART で接続するだけで OK.こんな感じ.
プログラム
作ったプログラムはこちら.UART で BP35A1 と接続した状態で実行すると現在の消費電力を出力します.
よくあるプログラムとの最大の違いは,スマートメータとの間の通信チャンネルを毎回探しにいかないこと.ネットワークを探しにいくと,スマートメータとの通信を確立するまでに10秒程度かかってしまいますが,このスクリプトの場合,実行してから 3 秒ほどで消費電力を取得できます.
|
#!/usr/bin/env python # -*- coding: utf-8 -*- import serial import sys import struct import pprint class BP35A1: def __init__(self, port, debug=False): self.ser = serial.Serial( port=port, baudrate=115200, timeout=1 ) self.opt = None self.debug = debug self.__send_command('SKRESET') def get_option(self): ret = self.__send_command('ROPT') val = int(ret, 16) self.opt = val def set_id(self, b_id): command = 'SKSETRBID {0}'.format(b_id) self.__send_command(command) def set_password(self, b_pass): command = 'SKSETPWD {0:X} {1}'.format(len(b_pass), b_pass) self.__send_command(command) def scan_channel(self, start_duration=5): duration = start_duration pan_info = None while True: command = 'SKSCAN 2 {0:X} {1}'.format((1 << 32) - 1, duration) self.__send_command(command) while True: line = self.ser.readline() # スキャン完了 if line.startswith('EVENT 22'): break # メータ発見 if line.startswith('EVENT 20'): pan_info = self.__parse_pan_desc() if pan_info != None: return pan_info duration += 1 if duration > 7: return None def connect(self, pan_desc): command = 'SKSREG S2 {0}'.format(pan_desc['Channel']) self.__send_command(command) command = 'SKSREG S3 {0}'.format(pan_desc['Pan ID']) self.__send_command(command) command = 'SKLL64 {0}'.format(pan_desc['Addr']) ipv6_addr = self.__send_command_raw(command) command = 'SKJOIN {0}'.format(ipv6_addr) self.__send_command(command) while True: line = self.ser.readline() # 接続失敗 if line.startswith('EVENT 24'): return None # 接続成功 if line.startswith('EVENT 25'): return ipv6_addr def recv_udp(self, ipv6_addr, wait_count=10): for i in xrange(wait_count): line = self.ser.readline().rstrip() if line == '': continue line = line.split(' ', 9) if line[0] != 'ERXUDP': continue if line[1] == ipv6_addr: return line[8] return None def send_udp(self, ipv6_addr, port, data, handle=1, security=True): command = 'SKSENDTO {0} {1} {2:04X} {3} {4:04X} {5}'.format( handle, ipv6_addr, port, 1 if security else 2, len(data), data ) self.__send_command_raw( command, lambda command: ' '.join(command.split(' ', 7)[0:6]) ) status = 0 while self.ser.readline().rstrip() != 'OK': def __parse_pan_desc(self): self.__expect('EPANDESC') pan_desc = {} for i in xrange(6): line = self.ser.readline() if not line.startswith(' '): raise Exception("Line does not start with space.\nrst: %s" % line) line = line.strip().split(':') pan_desc[line[0]] = line[1] return pan_desc def __send_command_raw(self, command, echo_back=lambda command: command): if self.debug: sys.stderr.write("SEND: %s\n" % pprint.pformat(command)) self.ser.write(command + "\r") # NOTE: echo_back はコマンドからエコーバック文字列を生成する関数. # デフォルトはコマンドそのもの. self.__expect(echo_back(command)) return self.ser.readline().rstrip() def __send_command(self, command): ret = self.__send_command_raw(command) ret = ret.split(' ', 1) if ret[0] != 'OK': raise Exception("Status is not OK.\nrst: %s" % ret[0]) return None if len(ret) == 1 else ret[1] def __expect(self, text): line = self.ser.readline() if line.rstrip() != text: raise Exception("Echo back is wrong.\nexp: %s\nrst: %s" % (text, line.rstrip())) class ECHONETLite: UDP_PORT = 3610 EHD1 = 0x10 class EHD2: FORMAT1 = 0x81 FORMAT2 = 0x82 class ESV: # プロパティ値書き込み要求(応答不要) PROP_WRITE_NO_RES = 0x60 # プロパティ値書き込み要求(応答要) PROP_WRITE = 0x61 # プロパティ値読み出し要求 PROP_READ = 0x62 # プロパティ値通知要求 PROP_NOTIFY = 0x62 # プロパティ値書き込み・読み出し要求 PROP_WRITE_READ = 0x6E class EOJ: # 住宅・設備関連機器クラスグループ CLASS_GROUP_HOUSING = 0x02 # 管理・操作関連機器クラスグループ CLASS_GROUP_MANAGEMENT = 0x05 class HOUSE_CLASS_GROUP: # 低圧スマート電力量メータクラス LOW_VOLTAGE_SMART_METER = 0x88 class MANAGEMENT_CLASS_GROUP: # コントローラ CONTROLLER = 0xFF class EPC: class LOW_VOLTAGE_SMART_METER: # 動作状態 STATUS = 0x80 # 積算電力量有効桁数 EFFECTIVE_DIGITS_OF_CUMULATIVE_ENERGY = 0xD7 # 積算電力量計測値(正方向計測値) CUMULATIVE_ENERGY_NORMAL_DIRECTION = 0xE0 # 積算電力量計測値(逆方向計測値) CUMULATIVE_ENERGY_REVERSE_DIRECTION = 0xE3 # 積算電力量単位(正方向、逆方向計測値) CUMULATIVE_ENERGY_UNIT = 0xE1 # 瞬時電力計測値 INSTANTANEOUS_ENERGY = 0xE7 # 瞬時電流計測値 INSTANTANEOUS_CURRENT = 0xE8 # 定時積算電力量計測値(正方向計測値) CUMULATIVE_ENERGY_FIXED_TIME_NORMAL_DIRECTION = 0xEA # 定時積算電力量計測値(逆方向計測値) CUMULATIVE_ENERGY_FIXED_TIME_REVERSE_DIRECTION = 0xEB @classmethod def parse_frame(cls, packet, is_binary=True): frame = {} # ヘッダ frame['EHD1'] = struct.unpack('B', packet[0])[0] frame['EHD2'] = struct.unpack('B', packet[1])[0] frame['TID'] = struct.unpack('>H', packet[2:4])[0] if (frame['EHD2'] == cls.EHD2.FORMAT1): frame['EDATA'] = cls.parse_data(packet[4:]) cls.validate_header(frame) return frame @classmethod def validate_header(cls, frame): if frame['EHD1'] != cls.EHD1: raise Exception('Invalid EHD1: %d' %frame['EHD1']) if (frame['EHD2'] != cls.EHD2.FORMAT1) and \ (frame['EHD2'] != cls.EHD2.FORMAT2): raise Exception('Invalid EHD2: %d' %frame['EHD2']) @classmethod def parse_data(cls, packet): data = {} data['SEOJ'] = struct.unpack('>I', chr(0) + packet[0:3])[0] data['DEOJ'] = struct.unpack('>I', chr(0) + packet[3:6])[0] data['ESV'] = struct.unpack('B', packet[6])[0] data['OPC'] = struct.unpack('B', packet[7])[0] prop_list = [] packet = packet[8:] for i in xrange(data['OPC']): prop = {} prop['EPC'] = struct.unpack('B', packet[0])[0] prop['PDC'] = struct.unpack('B', packet[1])[0] if prop['PDC'] == 0: prop['EDT'] = None else: prop['EDT'] = packet[2:(2+prop['PDC'])] prop_list.append(prop) data['prop_list'] = prop_list return data @classmethod def parse_inst_list(cls, packet): count = struct.unpack('B', packet[0])[0] packet = packet[1:] inst_list = [] for i in xrange(count): inst_info = {} inst_info['class_group_code'] = struct.unpack('B', packet[0])[0] inst_info['class_code'] = struct.unpack('B', packet[1])[0] inst_info['instance_code'] = struct.unpack('B', packet[2])[0] inst_info['EOJ'] = struct.unpack('>I', chr(0) + packet[0:3])[0] inst_list.append(inst_info) packet = packet[3:] return inst_list @classmethod def check_class(cls, inst_list, eoj): for inst_info in inst_list: if ((inst_info['EOJ'] & 0xFFFF00) == (eoj & 0xFFFF00)): return True return False @classmethod def build_frame(cls, edata, tid=1): return struct.pack('2B', cls.EHD1, cls.EHD2.FORMAT1) + \ struct.pack('>H', tid) + edata @classmethod def build_edata(cls, seoj, deoj, esv, prop_list): seoj_data = struct.pack('>I', seoj)[1:] deoj_data = struct.pack('>I', deoj)[1:] esv_data = struct.pack('B', esv) opc_data = struct.pack('B', len(prop_list)) edata = seoj_data + deoj_data + esv_data + opc_data for prop in prop_list: prop_data = cls.build_prop(prop['EPC'], prop['PDC'], prop.get('EDT')) edata += prop_data return edata @classmethod def build_eoj(cls, class_group_code, class_code, instance_code=0x1): return (class_group_code << 16) | (class_code << 8) | instance_code @classmethod def build_prop(cls, epc, pdc, edt): prop = struct.pack('2B', epc, pdc) if pdc != 0: prop += edt return prop class EchonetEnergy: def __init__(self, echonet_if, b_id, b_pass, debug=False): echonet_if.set_id(b_id) echonet_if.set_password(b_pass) self.echonet_if = echonet_if self.ipv6_addr = None self.eoj = ECHONETLite.build_eoj( ECHONETLite.EOJ.CLASS_GROUP_HOUSING, ECHONETLite.EOJ.HOUSE_CLASS_GROUP.LOW_VOLTAGE_SMART_METER ) def get_pan_info(self): return self.echonet_if.scan_channel() def connect(self, pan_info): self.ipv6_addr = self.echonet_if.connect(pan_info) if self.ipv6_addr == None: raise Exception('Faile to connect Wi-SUN') recv_packet = self.echonet_if.recv_udp(self.ipv6_addr) frame = ECHONETLite.parse_frame(recv_packet) # インスタンスリスト inst_list = ECHONETLite.parse_inst_list( frame['EDATA']['prop_list'][0]['EDT']) # 低圧スマート電力量メータクラスがあるか確認 if not ECHONETLite.check_class(inst_list, self.eoj): raise Exception('Meter not fount') def get_current_energy(self): edata = ECHONETLite.build_edata( ECHONETLite.build_eoj( ECHONETLite.EOJ.CLASS_GROUP_MANAGEMENT, ECHONETLite.EOJ.MANAGEMENT_CLASS_GROUP.CONTROLLER ), self.eoj, ECHONETLite.ESV.PROP_READ, [ { 'EPC': ECHONETLite.EPC.LOW_VOLTAGE_SMART_METER.INSTANTANEOUS_ENERGY, 'PDC': 0, } ] ) send_packet = ECHONETLite.build_frame(edata) while True: self.echonet_if.send_udp(self.ipv6_addr, ECHONETLite.UDP_PORT, send_packet) recv_packet = self.echonet_if.recv_udp(self.ipv6_addr) frame = ECHONETLite.parse_frame(recv_packet) if frame['EDATA']['SEOJ'] != self.eoj: continue for prop in frame['EDATA']['prop_list']: if prop['EPC'] != \ ECHONETLite.EPC.LOW_VOLTAGE_SMART_METER.INSTANTANEOUS_ENERGY: continue return struct.unpack('>I', prop['EDT'])[0] import pickle import json import os.path # B ルート ID (32文字) b_id = '00000000000000000000000000000000' # B ルートパスワード (12文字) b_pass = '000000000000' # ネットワーク情報のキャッシュファイル PAN_DESC_DAT = 'pan_desc.dat' # PAN ID の探索は時間がかかるので,キャッシュしておく def get_pan_info(energy_meter): if (os.path.exists(PAN_DESC_DAT)): with open(PAN_DESC_DAT, mode='rb') as f: return pickle.load(f) else: pan_info = energy_meter.get_pan_info() with open(PAN_DESC_DAT, mode='wb') as f: pickle.dump(pan_info, f) return pan_info echonet_if = BP35A1('/dev/ttyAMA0', True) energy_meter = EchonetEnergy( echonet_if, b_id, b_pass ) pan_info = get_pan_info(energy_meter) energy_meter.connect(pan_info) print( json.dumps( { 'energy': energy_meter.get_current_energy() } ) ) |
実行結果はこんな感じ.4秒ほどで取得できます.
1 2 3 4 5 6 7 8 9 10 11 |
raspberrypi% time python smart_meter.py SEND: 'SKRESET' SEND: 'SKSETRBID 00000000000000000000000000000000' SEND: 'SKSETPWD C 000000000000' SEND: 'SKSREG S2 3B' SEND: 'SKSREG S3 5ADB' SEND: 'SKLL64 38E08E0000355ADB' SEND: 'SKJOIN FE80:0000:0000:0000:3AE0:8E00:0035:5ADB' SEND: 'SKSENDTO 1 FE80:0000:0000:0000:3AE0:8E00:0035:5ADB 0E1A 1 000E \x10\x81\x00\x01\x05\xff\x01\x02\x88\x01b\x01\xe7\x00' {"energy": 869} python smart_meter.py 0.62s user 0.12s system 18% cpu 3.956 total |
よりベターなコード
前記のスクリプト書いている最中,なんかモヤモヤしたものを感じていたのですが,下記のサイトで紹介されているコードをみて腑に落ちました.
スマートメータから瞬間消費電力を読むRubyのコード
https://lowreal.net/2016/08/09/1
私のコードの場合,シーケンシャルに特定のイベントが発生することを仮定してしまっていますが,こちらのコードでは非同期的に発生するイベントを別スレッドでハンドリングし,イベントの内容に応じて事前に登録されたハンドラを呼び出すことで ECHONET Lite の通信を実現しています.
プロトコルの性質を踏まえると,イベントベースにするのが自然で理にかなっています.
常時起動させるようなデーモンのようなコードにする場合,こちらのコードのような形にするのが良さそうです.
コメント
[…] 後編に続く. […]