Skip to content

Instantly share code, notes, and snippets.

@Staars
Created July 3, 2025 20:08
Show Gist options
  • Save Staars/8ed2bf9c97fc9800b839185dd1cf66ad to your computer and use it in GitHub Desktop.
Save Staars/8ed2bf9c97fc9800b839185dd1cf66ad to your computer and use it in GitHub Desktop.
Daikin AC driver for Tasmota
class DAIKIN_PARSER
static def to_list(buf)
import string
var result = {}
var msg = buf
if classof(buf) == bytes
msg = buf.asstring()
end
msg = string.split(msg,",")
var success = false
for e:msg
var el = string.split(e, "=")
if el[1] == ""
continue
elif el[0] == "ssid1" # ignore ssid
continue
elif el[0] == "name"
el[1] = bytes(string.tr(el[1],"%","")).asstring()
elif el[0] == "ret" && el[1] == "OK"
success = true
continue
elif el[0] == "ret" && el[1] != "OK"
log(f"DAIKIN_PARSER: Error parsing message: {e}",1)
return nil
end
result[el[0]] = el[1]
end
return result
end
end
class DAIKIN_DISCOVERY
var u, devices
static start_ip = [192,168,1,100]
static end_ip = [192,168,1,255]
var ip, completion
def init(completion)
self.completion = completion
self.u = udp()
if self.u.begin("", 30000)
log(f"DAIKIN_DISCOVERY: begin scan from {self.start_ip[0]}.{self.start_ip[1]}.{self.start_ip[2]}.{self.start_ip[3]} to {self.end_ip[0]}.{self.end_ip[1]}.{self.end_ip[2]}.{self.end_ip[3]}")
else
log("DAIKIN_DISCOVERY: Failed to open UDP socket")
return
end
self.ip = self.start_ip
self.devices = []
self.send()
tasmota.add_driver(self)
end
def send()
var msg = "DAIKIN_UDP/common/basic_info"
self.u.send(f"{self.ip[0]}.{self.ip[1]}.{self.ip[2]}.{self.ip[3]}", 30050, bytes().fromstring(msg))
end
def next_ip()
if self.ip[3] < self.end_ip[3]
self.ip[3] += 1
else
self.ip[3] = self.start_ip[3]
if self.ip[2] < self.end_ip[2]
self.ip[2] += 1
else
self.ip[2] = self.start_ip[2]
if self.ip[1] < self.end_ip[1]
self.ip[1] += 1
else
self.ip[1] = self.start_ip[1]
if self.ip[0] < self.end_ip[0]
self.ip[0] += 1
else
log("DAIKIN_DISCOVERY: Finished scanning IP range")
self.stop()
end
end
end
end
end
def stop()
tasmota.remove_driver(self)
self.u.close()
self.completion(self.devices)
log(f"DAIKIN_DISCOVERY: ended with devices: {self.devices}",4)
end
def parse(buf, ip, port)
var device = DAIKIN_PARSER.to_list(buf)
device['ip'] = ip
for d:self.devices
if d['ip'] == ip
log(f"DAIKIN_DISCOVERY: Device already exists at {ip}:{port}, ignoring",4)
return
end
end
log(f"DAIKIN_DISCOVERY: Found device at {ip}:{port}",2)
log(f"DAIKIN_DISCOVERY: Parsed device info: {device}",3)
self.devices.push(device)
end
def every_50ms()
var buf = self.u.read()
if buf != nil
log(f"DAIKIN_DISCOVERY: Received UDP packet: {buf}",3)
self.parse(buf,self.u.remote_ip, self.u.remote_port)
end
self.next_ip()
self.send()
end
end
class DAIKIN_CONFIG
var devices, discovery, finished, cb
def init(cb)
import path
import json
self.cb = cb
if path.exists("/daikin.json")
log("DAIKIN_CONFIG: Loading configuration from /daikin.json",1)
var f = open("/daikin.json", "r")
self.devices = json.load(f.read())
f.close()
log(f"DAIKIN_CONFIG: Loaded configuration: {self.devices}",3)
self.finished = true
self.cb()
else
log("DAIKIN_CONFIG: No configuration file found, starting discovery",1)
self.discovery = DAIKIN_DISCOVERY(/devices->self.completion(devices))
self.finished = false
end
end
def completion(devices)
import json
self.devices = devices
log(f"DAIKIN_CONFIG: Completed with {self.devices}",3)
if size(devices) == 0
log("DAIKIN_CONFIG: No devices found, discovery failed",1)
self.cb()
return
end
self.discovery = nil
var f = open("/daikin.json", "w")
f.write(json.dump(self.devices))
f.close()
log("DAIKIN_CONFIG: Saved devices to /daikin.json",1)
self.finished = true
self.cb()
end
end
class DAIKIN
var cfg, update, need_model_info, need_sensor_info, need_control_info
def init()
log("DAIKIN: Initializing Daikin driver",1)
self.cfg = DAIKIN_CONFIG(/->self.completion())
tasmota.add_driver(self)
end
def read(ip, get_msg)
var cl = webclient()
cl.begin(f"http://{ip}/{get_msg}")
var r = cl.GET()
var s = cl.get_string()
cl.close()
s = DAIKIN_PARSER.to_list(s)
return s
end
def completion()
self.need_model_info = true
end
def every_250ms()
if self.need_model_info == true
self.get_model_info()
self.need_model_info = false
log(f"DAIKIN: Configuration finished, devices found: {size(self.cfg.devices)}",2)
for d:self.cfg.devices
log(f"DAIKIN: Device {d['name']} at {d['ip']}, Fw: {d['ver']}, MAC: {d['mac']}", 2)
end
elif self.need_sensor_info != false
self.get_sensor_info()
self.need_sensor_info = false
elif self.need_control_info != false
self.get_control_info()
self.need_control_info = false
end
end
def get_model_info()
for d:self.cfg.devices
var model_info = self.read(d["ip"], "aircon/get_model_info")
for k: model_info.keys()
d[k] = model_info[k]
end
log(f"DAIKIN = {model_info}",3)
end
end
def get_control_info()
for d:self.cfg.devices
var control_info = self.read(d["ip"], "aircon/get_control_info")
for k: control_info.keys()
d[k] = control_info[k]
end
log(f"DAIKIN: Control info for {d['name']}: {control_info}",3)
end
end
def get_sensor_info()
for d:self.cfg.devices
var sensor_info = self.read(d["ip"], "aircon/get_sensor_info")
for k: sensor_info.keys()
d[k] = sensor_info[k]
end
log(f"DAIKIN: Sensor info for {d['name']}: {sensor_info}",3)
end
end
def json_append()
import string
if self.need_sensor_info == nil return nil end
var msg = ""
for d:self.cfg.devices
msg += string.format(",\"Daikin\":{\"Name\":\"%s\",\"Power\":%i,\"Temperature\":%.2f}",
d["name"], d["pow"], d["htemp"])
end
tasmota.response_append(msg)
self.update = true # Trigger update for next cycle
return true
end
def dump()
print(self.cfg.devices)
end
def web_sensor()
import string
if self.need_sensor_info == nil return nil end
var msg = ""
for d:self.cfg.devices
msg = string.format("{s}%s{m}%s{e}"..
"{s}Temperature inner{m}%.2f °C{e}"..
"{s}Temperature outer{m}%.2f °C{e}"..
"{s}Temperature target{m}%.2f °C{e}",
d["name"], d["ip"],
d["htemp"],d["otemp"],d["stemp"])
end
tasmota.web_send_decimal(msg)
return true
end
end
d = DAIKIN()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment