91 lines
3.1 KiB
Python
91 lines
3.1 KiB
Python
|
|
import broadlink
|
||
|
|
import time
|
||
|
|
import json
|
||
|
|
import os
|
||
|
|
import logging
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import Optional, Dict, List, Any
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
BASE_DIR = Path(__file__).parent
|
||
|
|
CODES_FILE = BASE_DIR / "broadlink_codes.json"
|
||
|
|
|
||
|
|
DEFAULT_CONFIG = {
|
||
|
|
"ip": "192.168.1.192",
|
||
|
|
"mac": "78:0f:77:18:08:00",
|
||
|
|
"port": 80,
|
||
|
|
"dev_type": 0x2712 # RM Pro+
|
||
|
|
}
|
||
|
|
|
||
|
|
def load_codes() -> Dict[str, str]:
|
||
|
|
"""Загружает коды, автоматически обрезая пробелы в именах команд."""
|
||
|
|
if CODES_FILE.exists():
|
||
|
|
with open(CODES_FILE, "r", encoding="utf-8") as f:
|
||
|
|
raw = json.load(f)
|
||
|
|
return {k.strip(): v for k, v in raw.items()}
|
||
|
|
return {}
|
||
|
|
|
||
|
|
def save_codes(codes: Dict[str, str]) -> bool:
|
||
|
|
with open(CODES_FILE, "w", encoding="utf-8") as f:
|
||
|
|
json.dump(codes, f, indent=4, ensure_ascii=False)
|
||
|
|
return True
|
||
|
|
|
||
|
|
def create_device(ip: str = None, mac: str = None, port: int = None, dev_type: int = None) -> Optional[Any]:
|
||
|
|
ip = ip or DEFAULT_CONFIG["ip"]
|
||
|
|
mac = mac or DEFAULT_CONFIG["mac"]
|
||
|
|
port = port or DEFAULT_CONFIG["port"]
|
||
|
|
dev_type = dev_type or DEFAULT_CONFIG["dev_type"]
|
||
|
|
|
||
|
|
try:
|
||
|
|
mac_bytes = bytes.fromhex(mac.replace(":", "").lower())
|
||
|
|
dev = broadlink.gendevice(dev_type=dev_type, host=(ip, port), mac=mac_bytes)
|
||
|
|
dev.auth()
|
||
|
|
logger.info(f"✅ Устройство {ip} авторизовано (тип 0x{dev_type:04x})")
|
||
|
|
return dev
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"❌ Ошибка подключения: {e}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
def learn_code(dev, name: str, timeout: int = 15) -> Optional[str]:
|
||
|
|
try:
|
||
|
|
dev.enter_learning()
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"⚠️ Не удалось войти в режим обучения: {e}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
for _ in range(timeout):
|
||
|
|
time.sleep(1)
|
||
|
|
try:
|
||
|
|
data = dev.check_data()
|
||
|
|
if data:
|
||
|
|
logger.info(f"✅ Код '{name}' захвачен! ({len(data)} байт)")
|
||
|
|
return data.hex()
|
||
|
|
except (broadlink.exceptions.ReadError, AttributeError, Exception):
|
||
|
|
continue
|
||
|
|
logger.warning(f"❌ Таймаут для '{name}'")
|
||
|
|
return None
|
||
|
|
|
||
|
|
def send_command(dev, command_name: str, codes: Dict[str, str] = None) -> bool:
|
||
|
|
if codes is None:
|
||
|
|
codes = load_codes()
|
||
|
|
if command_name not in codes:
|
||
|
|
logger.error(f"❌ Команда '{command_name}' не найдена.")
|
||
|
|
return False
|
||
|
|
try:
|
||
|
|
hex_code = codes[command_name]
|
||
|
|
data = bytes.fromhex(hex_code)
|
||
|
|
dev.send_data(data)
|
||
|
|
logger.info(f"📤 Отправлено: '{command_name}' ✅")
|
||
|
|
return True
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"❌ Ошибка отправки '{command_name}': {e}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
def discover_devices() -> List[Dict[str, Any]]:
|
||
|
|
try:
|
||
|
|
devices = broadlink.discover(timeout=5)
|
||
|
|
return [{"host": d.host[0], "mac": d.mac.hex(), "type": d.devtype} for d in devices]
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Ошибка поиска: {e}")
|
||
|
|
return []
|