Whisper + ChatGPT + ECHONET Lite で家電操作する音声アシスタント
※このページは、神奈川工科大学 コミュニケーションロボティクス研究室および 電気電子工学科/ホームエレクトロニクス開発学科の実習用チュートリアル用のページです。
1. はじめに
AIにどんな身体を持たせるか?AIの"知能"が急速に発展する中で、ますますおもしろさが増すテーマの一つです。
ここでは、
「家を身体に持つAI」
をテーマに、ChatGPTと連携して音声対話でスマートハウスを操作するシステムを構築します。スマートハウスには神奈川工科大学電気電子情報工学科の近未来室の設備を想定していますが、シミュレーターのスマートハウス環境でも学習できます。
2. 目的
音声入力を通じて、自然な会話で家庭内の家電を操作できるアシスタントを構築します。
構成要素は以下の通りです:
音声認識AI: Whisper API
意図解釈AI: ChatGPT(function calling)
制御送信: ECHONET Lite(UDP/3610)
※実行にはOpen AI のAPI キー(有料)が必要になります。
※※ ECHONET Liteについてはこちらを参考にしてください。
M5 Atome Lite・Atom Matrixでおうちハック! ECHONET Lite によるスマートハウス操作 入門編
※※※ Unity上のLive2Dアバターと連携させて萌家電をつくりたい人は、山崎までお問い合わせを
3. 対応する家電と機能
エアコン、電動ブラインド(プリーツスクリーン、ロールスクリーン)、照明の3種類の家電を操作することができます。
エアコン
機能: オン、オフ
IPアドレス:
シミュレーター上(ローカル):127.0.0.1
近未来室(E205):192.168.0.9
電動ブラインド / プリーツスクリーン / ロールスクリーン
機能: オン、オフ、開、閉、停止
DEOJ:
電動ブラインド(インスタンス1):026001  ※シミュレーター上ではカーテン
電動プリーツスクリーン(インスタンス2):026002
電動ロールスクリーン(インスタンス3):026003
呼称エイリアス:
「カーテン」→ blind1(電動ブラインド)を指す(※シミュレーターのカーテン対応)
IPアドレス:
シミュレーター上(ローカル):127.0.0.1
近未来室(E205):192.168.0.153
照明
機能:
オン、オフ
点灯モード設定
自動 = 0x41
通常灯 = 0x42
常夜灯 = 0x43
カラー灯 = 0x45
カラー灯モード時の RGB 設定(R, G, B 各1バイト)
IPアドレス:
シミュレーター上(ローカル):127.0.0.1
近未来室(E205):192.168.0.153
注意点:
RGB 設定は 必ず点灯モードをカラー灯(0x45)に設定後に送信
連続送信は100〜200msの間隔を空けると安定する
4. 実装のポイント
LLM 側のプロンプトルール
「ブラインド」「プリーツ」「ロール」「カーテン」から対象デバイスを選択
ブラインド系:
「開けて」「上げて」→ open
「閉めて」「下げて」→ close
「止めて」「ストップ」→ stop
照明: 色名(赤/青/緑/暖色/寒色など)も RGB にマッピング
ECHONET Lite 制御
ON/OFF: EPC=0x80(0x30=ON / 0x31=OFF)
ブラインド開閉停止: EPC=0xE0(開=0x41, 閉=0x42, 停止=0x43)
照明モード: EPC=0xB6(0x41〜0x45)
照明RGB: EPC=0xE0(R, G, B)
コマンド送信順序(例:照明を青に)
電源 ON(0x80 = 0x30)
点灯モードをカラー灯に(0xB6 = 0x45)
RGB 設定(0xE0 = 0,0,255)
→ 各コマンドの間に 0.15秒 インターバル
音声操作例
「エアコンつけて」→ aircon / on
「カーテン開けて」→ blind1 / open
「プリーツ下げて」→ blind2 / close
「ロールスクリーン止めて」→ blind3 / stop
「照明を常夜灯に」→ light / set_mode(0x43)
「ライトを赤でつけて」→ light / set_rgb(R=255,G=0,B=0)
改良ポイント(未実装)
GUI(Tkinter/Streamlit)で状態可視化と操作
MQTTブリッジで外部システム連携
時間帯ルール(例:夜は常夜灯のみ許可)
5. コード
上記の機能を実装したコードです。pythonで動きます。
IPアドレスはシミュレーターを同一PC上(ローカル環境)で実行したときを想定しています。自分の環境に合わせて変更してください。(近未来室の機器のIPアドレスは「3. 対応する家電と機能」の章に示しています。)
なお、実行にはOpen AI のAPI キー(有料)が必要になります。
下記の「YOUR-OPENAI_API_KEY」部分を自分の環境に合わせて書き換えてください。
OPENAI_API_KEY = "YOUR-OPENAI_API_KEY"
code:必要パッケージ
pip install sounddevice soundfile numpy openai-python
code:ChatELAssistant.py
import os
import io
import socket
import struct
import time
from dataclasses import dataclass
from typing import Optional, Literal, Dict, Any, List
import numpy as np
import sounddevice as sd
import soundfile as sf
from openai import OpenAI
# ========= ユーザー環境に合わせる設定 =========
OPENAI_API_KEY = "YOUR-OPENAI_API_KEY" # 自分の環境に合わせて書き換えること
SEND_GAP_SEC = 0.15 # 連続プロパティ送信のインターバル(100〜200ms推奨)
# デバイス一覧(IPはダミーで127.0.0.1)
DEVICES = {
"aircon": {
"name_ja": "エアコン", "クーラー", "冷房", "暖房", # 呼称の同義語
"ip": "127.0.0.1",
"deoj": bytes.fromhex("01 30 01"), # Home Air Conditioner (0x0130), instance 1
"features": "on", "off",
},
"blind1": {
"name_ja": "ブラインド", "電動ブラインド", "ブラインド1",
"ip": "127.0.0.1",
"deoj": bytes.fromhex("02 60 01"), # 電動ブラインド(インスタンス1)
"features": "on", "off",
},
"blind2": {
"name_ja": "プリーツスクリーン", "プリーツ", "スクリーン",
"ip": "127.0.0.1",
"deoj": bytes.fromhex("02 60 02"), # 電動プリーツスクリーン(インスタンス2)
"features": "on", "off",
},
"blind3": {
"name_ja": "ロールスクリーン", "ロール", "ロールスクリーン3",
"ip": "127.0.0.1",
"deoj": bytes.fromhex("02 60 03"), # 電動ロールスクリーン(インスタンス3)
"features": "on", "off",
},
"light": {
"name_ja": "照明", "ライト", "電気",
"ip": "127.0.0.1",
"deoj": bytes.fromhex("02 90 01"), # 一般照明(例) 0x0290 / instance 1
"features": "on", "off", "mode", "rgb",
},
}
# EPC割り当て(ご環境に合わせて変更可)
EPC_MAP = {
"operation_status": 0x80, # ON=0x30 / OFF=0x31
"light_mode": 0xB6, # 例:自動=0x41, 通常灯=0x42, 常夜灯=0x43, カラー灯=0x45
"light_rgb": 0xC0, # 例:R,G,B
"blind_motion": 0xE0, # ブラインド開閉停止
}
LIGHT_MODE_VALUES = {
"auto": 0x41,
"normal": 0x42,
"night": 0x43,
"color": 0x45,
# 日本語別名
"自動": 0x41,
"通常灯": 0x42,
"常夜灯": 0x43,
"カラー灯": 0x45,
}
#ブラインド開閉停止の値(代表例)
BLIND_MOTION_VALUES = {
"open": 0x41,
"close": 0x42,
"stop": 0x43,
# 日本語の言い換えも一応サポート
"開": 0x41, "あけ": 0x41, "開け": 0x41, "上げ": 0x41, "上": 0x41,
"閉": 0x42, "しめ": 0x42, "閉め": 0x42, "下げ": 0x42, "下": 0x42,
"停止": 0x43, "止め": 0x43, "ストップ": 0x43,
}
# ========= Audio録音 → Whisper =========
def record_audio(seconds=4, samplerate=16000) -> bytes:
print(f"REC {seconds}秒録音中… 話しかけてください。")
audio = sd.rec(int(seconds * samplerate), samplerate=samplerate, channels=1, dtype='float32')
sd.wait()
# WAVにエンコード(in-memory)
buf = io.BytesIO()
sf.write(buf, audio, samplerate, format='WAV', subtype='PCM_16')
buf.seek(0)
return buf.read()
def transcribe_whisper(wav_bytes: bytes) -> str:
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# Whisper API(v1互換)
with io.BytesIO(wav_bytes) as f:
f.name = "input.wav"
return client.audio.transcriptions.create(
file=f,
model="whisper-1", # Whisper APIモデル名
response_format="text",
language="ja"
)
# ========= ChatGPTで意図をJSON化(function calling) =========
# 実運用では "gpt-4o-mini" など高精度/低レイテンシモデル推奨
LLM_MODEL = "gpt-4o-mini"
def llm_plan_command(utterance: str) -> Dictstr, Any:
"""
発話を解析し、以下いずれかのツール呼び出し相当のJSONを返す:
control_device(
device: "aircon" | "blind1" | "blind2" | "blind3" | "light",
action: "on" | "off" | "set_mode" | "set_rgb",
params?: { mode?: "auto"|"normal"|"night"|"color", r?: int, g?: int, b?: int }
)
"""
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
tools = [
{
"type": "function",
"function": {
"name": "control_device",
"description": "家電を制御するためのコマンドを確定する",
"parameters": {
"type": "object",
"properties": {
"device": {
"type": "string",
"enum": list(DEVICES.keys()),
"description": "対象デバイスID(aircon / blind1..3 / light)"
},
"action": {
"type": "string",
"enum": "on", "off", "set_mode", "set_rgb", "open", "close", "stop"
},
"params": {
"type": "object",
"properties": {
"mode": {
"type": "string",
"enum": "auto", "normal", "night", "color", "自動", "通常灯", "常夜灯", "カラー灯"
},
"r": {"type": "integer", "minimum": 0, "maximum": 255},
"g": {"type": "integer", "minimum": 0, "maximum": 255},
"b": {"type": "integer", "minimum": 0, "maximum": 255},
}
}
},
"required": "device", "action"
}
}
}
]
sys_prompt = """あなたはスマートホームの音声アシスタントです。
- 日本語の曖昧な表現を解決し、可能なら単一の確定コマンドにしてください。
- 「ブラインド」「プリーツ」「ロール」「カーテン」などの語から blind1/blind2/blind3 を選びます。
- 「カーテン」は blind1 を優先的に指すものとします。
- ブラインド系の「開けて/上げて」= open、「閉めて/下げて」= close、「止めて/ストップ」= stop。
- 照明の色は一般色名にも対応しRGBに写像してください。
- 可能ならON/OFFと同時に色/モードをまとめて1回のコマンドにしてください。
- 判断できない場合は、もっとも自然な意図を1つ選んでください(確認は行わない)。
- 通常の会話でも、気を利かせていい感じに家電を動かしてみてください。
"""
# 発話→デバイスIDヒント
# (雑に前処理して精度を上げる:あくまで補助)
device_hint = None
utt = utterance
for dev_id, meta in DEVICES.items():
for alias in meta"name_ja":
if alias in utt:
device_hint = dev_id
break
messages = [
{"role": "system", "content": sys_prompt},
{"role": "user", "content": f"発話: {utterance}\nヒントdevice: {device_hint or '不明'}"},
]
resp = client.chat.completions.create(
model=LLM_MODEL,
messages=messages,
tools=tools,
tool_choice="auto",
temperature=0.2,
)
choice = resp.choices0
if choice.finish_reason == "tool_calls" and choice.message.tool_calls:
tool_call = choice.message.tool_calls0
if tool_call.function.name == "control_device":
import json
return json.loads(tool_call.function.arguments)
# フォールバック(念のため)
return {"device": "light", "action": "on", "params": {}}
# ========= ECHONET Lite 送信(UDP/3610) =========
EL_PORT = 3610
def make_el_seti_frame(seoj: bytes, deoj: bytes, epc: int, edt: bytes, tid: int = 1) -> bytes:
"""
ECHONET Lite フレーム(SetI=0x60/ESV=0x61)生成
SEOJ/DEOJは3バイト、EPCは1バイト、EDTは0〜Nバイト
"""
# EHD1,EHD2
header = bytes(0x10, 0x81)
# TID 2B
tid_bytes = struct.pack(">H", tid & 0xFFFF)
esv = 0x61 # SetI (応答不要でOKなら0x60でもよい)
opc = 0x01 # プロパティ1件
epc_b = bytes(epc & 0xFF)
pdc = bytes(len(edt) & 0xFF)
frame = header + tid_bytes + seoj + deoj + bytes(esv, opc) + epc_b + pdc + edt
return frame
def send_el(ip: str, frame: bytes) -> None:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.sendto(frame, (ip, EL_PORT))
PROFILE_SEOJ = bytes.fromhex("0E F0 01") # Node profile object
def op_onoff(value: Literal"on", "off") -> bytes:
epc = EPC_MAP"operation_status"
edt = b"\x30" if value == "on" else b"\x31"
return epc, edt
def op_light_mode(mode_str: str) -> bytes:
epc = EPC_MAP"light_mode"
v = LIGHT_MODE_VALUES.get(mode_str, LIGHT_MODE_VALUES.get(mode_str.lower(), 0x42)) # デフォルト通常灯
return epc, bytes(v)
def op_light_rgb(r: int, g: int, b: int) -> bytes:
epc = EPC_MAP"light_rgb"
return epc, bytes(r & 0xFF, g & 0xFF, b & 0xFF)
def op_blind_motion(kind: str) -> bytes:
epc = EPC_MAP"blind_motion"
v = BLIND_MOTION_VALUES.get(kind, BLIND_MOTION_VALUES"stop")
return epc, bytes(v)
# ========= 色名→RGB(簡易辞書) =========
COLOR_DICT = {
"赤": (255, 0, 0), "青": (0, 0, 255), "緑": (0, 255, 0), "白": (255, 255, 255),
"オレンジ": (255, 165, 0), "紫": (128, 0, 128), "ピンク": (255, 105, 180),
"黄色": (255, 255, 0), "水色": (0, 255, 255), "シアン": (0, 255, 255),
"マゼンタ": (255, 0, 255), "暖色": (255, 180, 100), "寒色": (100, 180, 255)
}
def coerce_color_params(params: Dictstr, Any) -> Dictstr, int:
# "color_name" → RGB の補助(LLMが色名を返した場合に備える)
if not params:
return {}
if all(k in params for k in ("r", "g", "b")):
return {"r": params"r", "g": params"g", "b": params"b"}
name = params.get("color") or params.get("name") or params.get("mode")
if isinstance(name, str) and name in COLOR_DICT:
r, g, b = COLOR_DICTname
return {"r": r, "g": g, "b": b}
return params
# ========= メインループ =========
def run_once():
wav = record_audio(seconds=4)
text = transcribe_whisper(wav)
print(f"STT {text}")
plan = llm_plan_command(text)
print(f"PLAN {plan}")
device_id = plan"device"
action = plan"action"
params = plan.get("params", {}) or {}
dev = DEVICESdevice_id
ip = dev"ip"
deoj = dev"deoj"
# --- アクション → ECHONET送信 ---
commands: Listbytes = []
# ON/OFFはEPC:0x80
if action in ("on", "off"):
epc, edt = op_onoff(action)
frame = make_el_seti_frame(PROFILE_SEOJ, deoj, epc, edt, tid=int(time.time()) & 0xFFFF)
commands.append(frame)
#ブラインド 開/閉/停止
if action in ("open", "close", "stop"):
epc, edt = op_blind_motion(action)
frame = make_el_seti_frame(PROFILE_SEOJ, deoj, epc, edt, tid=(int(time.time()) + 1) & 0xFFFF)
commands.append(frame)
# 照明のモード
if action == "set_mode":
mode = params.get("mode", "normal")
# カラー灯でも他モードでも、念のためON → モード
epc0, edt0 = op_onoff("on")
frame0 = make_el_seti_frame(PROFILE_SEOJ, deoj, epc0, edt0, tid=(int(time.time()) + 1) & 0xFFFF)
commands.append(frame0)
epc1, edt1 = op_light_mode(mode)
frame1 = make_el_seti_frame(PROFILE_SEOJ, deoj, epc1, edt1, tid=(int(time.time()) + 2) & 0xFFFF)
commands.append(frame1)
# 照明のRGB
if action == "set_rgb":
params = coerce_color_params(params)
r, g, b = params.get("r", 255), params.get("g", 255), params.get("b", 255)
# 1) 電源ON(安全のため。既にONでも悪影響なし)
epc0, edt0 = op_onoff("on")
frame0 = make_el_seti_frame(PROFILE_SEOJ, deoj, epc0, edt0, tid=(int(time.time()) + 1) & 0xFFFF)
commands.append(frame0)
# 2) 点灯モード=カラー灯(0x45)
epc1, edt1 = op_light_mode("color")
frame1 = make_el_seti_frame(PROFILE_SEOJ, deoj, epc1, edt1, tid=(int(time.time()) + 2) & 0xFFFF)
commands.append(frame1)
# 3) RGB設定(R,G,B の順)
epc2, edt2 = op_light_rgb(r, g, b)
frame2 = make_el_seti_frame(PROFILE_SEOJ, deoj, epc2, edt2, tid=(int(time.time()) + 3) & 0xFFFF)
commands.append(frame2)
# 送信
for i, f in enumerate(commands):
send_el(ip, f)
if i < len(commands) - 1:
time.sleep(SEND_GAP_SEC)
print(f"SEND -> {ip}:{EL_PORT} に {len(commands)} パケット送信済み。")
print(f"SEND -> {ip}:{EL_PORT} に {commands} ")
if __name__ == "__main__":
print("=== 音声→家電操作デモ(Whisper+ChatGPT+ECHONET Lite)===")
run_once()
6. 実行について
エミュレーター MoekadenRoom
https://gyazo.com/72be550f60a34b59307a02eca041ae69
手元にECHONET Lite対応家電がない場合は、MoekadenRoomの使用をおすすめします。MoekadenRoomはスマートハウス環境のエミュレーターでエアコン、照明、電動ブラインド、電子錠、温度計、スマートメーターの合計6種類の機器オブジェクトを含んでいます。MoekadenRoomのセットアップについてはこちらを参照してください。
簡易照明エミュレーター
上記のMoekadenRoomは照明のカラー指定に対応していません。
照明のカラー指定のテストができるように、python上で実行できる簡易照明エミュレーターを用意しました。このエミュレーターはMoekadenRoomと同一のPC上では起動できないので注意してください(ポート3610を取り合ってしまうため)。
※このエミュレーターGETには対応していません
code:EL_light.py
import socket
import threading
import tkinter as tk
UDP_PORT = 3610
# 簡易マッピング(EPC=0xB6, 1バイト)
MODE_NAME = {
0x40: "電球色",
0x50: "白色",
0x60: "昼白色",
0x70: "昼光色",
0x00: "その他/自動",
}
# 表示色(見やすさ用の仮色)
MODE_COLOR = {
0x40: "#ffd27a", # 電球色
0x50: "#fff9e6", # 白色
0x60: "#f0f6ff", # 昼白色
0x70: "#dff3ff", # 昼光色
0x00: "#ffffff", # その他/自動
}
def hexdump(b: bytes) -> str:
return " ".join(f"{x:02X}" for x in b)
class LightEmulator:
def __init__(self, master):
self.master = master
master.title("ECHONET Lite 照明エミュレータ")
self.power_on = False
self.mode = None # EPC 0xB6 の値(int)または None
self.rgb = None # (R,G,B) または None
self.status_label = tk.Label(master, text="状態: 未受信", font=("Helvetica", 16))
self.status_label.pack(pady=10)
self.canvas = tk.Canvas(master, width=100, height=100)
self.canvas.pack()
self.lamp = self.canvas.create_oval(10, 10, 90, 90, fill="gray", outline="black")
# 応答送信用ソケット(ユニキャスト返信に使用)
self.tx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# スレッドでUDP受信開始
self.thread = threading.Thread(target=self.udp_listener, daemon=True)
self.thread.start()
def udp_listener(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(("0.0.0.0", UDP_PORT))
print(f"UDPポート{UDP_PORT}で待機中...")
while True:
data, addr = sock.recvfrom(1024)
print(f"受信: {addr} {hexdump(data)}")
# 応答先アドレスも渡す
self.master.after(0, self.handle_packet, data, addr)
def handle_packet(self, data: bytes, addr):
# 最低限: EHD(2) + TID(2) + SEOJ(3) + DEOJ(3) + ESV(1) + OPC(1) + EPC(1) + PDC(1)
if len(data) < 14:
return
ehd1, ehd2 = data0, data1
if (ehd1, ehd2) != (0x10, 0x81):
return # この簡易実装は 0x10 0x81 のみ
tid = data2:4
seoj = data4:7 # 送信元(相手)
# deoj = data7:10 # 宛先(本来チェックするが簡易にスキップ)
esv = data10
opc = data11
if opc < 1:
return
# このサンプルは 1プロパティのみを想定
epc = data12
pdc = data13
if len(data) < 14 + pdc:
return
edt = data14:14+pdc
# ---- 受信処理 ----
if esv in (0x60, 0x61): # SetI / SetC
updated = False
# EPCごとの更新
if epc == 0x80 and pdc == 1:
if edt0 == 0x30: # ON
self.power_on = True; updated = True
elif edt0 == 0x31: # OFF
self.power_on = False; updated = True
elif epc == 0xB6 and pdc == 1:
self.mode = edt0
self.rgb = None # 任意: モード設定時はRGBをクリア
updated = True
elif epc == 0xC0 and pdc == 3:
self.rgb = (edt0, edt1, edt2)
# 任意: RGB設定時、モードは0x00(その他/自動)に寄せる
if self.mode is None:
self.mode = 0x00
updated = True
if updated:
self.update_view()
# ---- SetC のときは 0x71(Set_Res) で応答 ----
if esv == 0x61:
# 応答EDT(現在値)を作る
if epc == 0x80:
cur = 0x30 if self.power_on else 0x31
edt_res = bytes(cur)
elif epc == 0xB6:
edt_res = bytes(self.mode if self.mode is not None else 0x00)
elif epc == 0xC0:
if self.rgb:
edt_res = bytes(self.rgb)
else:
edt_res = b"\x00\x00\x00"
else:
# 未対応EPC: 簡易実装では黙殺(必要なら異常応答を実装)
return
frame = self.build_response_set_res(tid, seoj, epc, edt_res)
self.tx_sock.sendto(frame, addr)
print(f"返信(Set_Res): {addr} {hexdump(frame)}")
def build_response_set_res(self, req_tid: bytes, req_seoj: bytes, epc: int, edt_res: bytes) -> bytes:
"""
SetC(0x61)への正常応答 Set_Res(0x71) を1プロパティで作る。
SEOJ=自機(仮に 02 90 01)、DEOJ=相手SEOJ
"""
seoj_me = b"\x02\x90\x01" # 一般照明(仮)
deoj_you = req_seoj
body = bytearray()
body += b"\x10\x81" # EHD
body += req_tid # TIDはそのまま流用
body += seoj_me # SEOJ
body += deoj_you # DEOJ
body += b"\x71" # ESV=Set_Res
body += b"\x01" # OPC=1
body.append(epc) # EPC
body.append(len(edt_res)) # PDC
body += edt_res # EDT
return bytes(body)
def update_view(self):
# 色決定
if not self.power_on:
fill = "gray"
else:
if self.rgb:
fill = "#{:02x}{:02x}{:02x}".format(*self.rgb)
elif self.mode in MODE_COLOR:
fill = MODE_COLORself.mode
else:
fill = "yellow" # デフォルト
self.canvas.itemconfig(self.lamp, fill=fill)
# ラベル文言
power_txt = "ON" if self.power_on else "OFF"
mode_txt = MODE_NAME.get(self.mode, "未設定" if self.mode is None else f"0x{self.mode:02X}")
color_txt = "#{:02X}{:02X}{:02X}".format(*self.rgb) if self.rgb else "-"
self.status_label.config(text=f"状態: {power_txt} / モード: {mode_txt} / RGB: {color_txt}")
# 起動
if __name__ == "__main__":
root = tk.Tk()
app = LightEmulator(root)
root.mainloop()
#ChatGPT #AI #ECHONET-Lite #スマートハウス #IoT #ものづくりプロジェクトII #ものづくりプロジェクトII
https://gyazo.com/71c7de59f100448c29cdb7f29fbd171b
Communication Robotics Lab.