(pytuya in localtuya) vs tinytuya
119 removals
672 lines
102 additions
669 lines
# Python module to interface with Shenzhen Xenon ESP8266MOD WiFi smart devices
# TinyTuya Module
# E.g. https://wikidevi.com/wiki/Xenon_SM-PW701U
# -*- coding: utf-8 -*-
# SKYROKU SM-PW701U Wi-Fi Plug Smart Plug
"""
# Wuudi SM-S0301-US - WIFI Smart Power Socket Multi Plug with 4 AC Outlets and 4 USB Charging Works with Alexa
Python module to interface with Tuya WiFi smart devices
#
# This would not exist without the protocol reverse engineering from
Author: Jason A. Cox
# https://github.com/codetheweb/tuyapi by codetheweb and blackrozes
For more information see https://github.com/jasonacox/tinytuya
#
# Tested with Python 2.7 and Python 3.6.1 only
Classes
OutletDevice(dev_id, address, local_key=None)
CoverDevice(dev_id, address, local_key=None)
BulbDevice(dev_id, address, local_key=None)
dev_id (str): Device ID e.g. 01234567891234567890
address (str): Device Network IP Address e.g. 10.0.1.99
local_key (str, optional): The encryption key. Defaults to None.
Functions
json = status() # returns json payload
set_version(version) # 3.1 [default] or 3.3
set_dpsUsed(dpsUsed)
set_status(on, switch=1) # Set status of the device to 'on' or 'off' (bool)
set_value(index, value) # Set int value of any index.
turn_on(switch=1):
turn_off(switch=1):
set_timer(num_secs):
CoverDevice:
open_cover(switch=1):
close_cover(switch=1):
stop_cover(switch=1):
BulbDevice
set_colour(r, g, b):
set_white(brightness, colourtemp):
set_brightness(brightness):
set_colourtemp(colourtemp):
result = brightness():
result = colourtemp():
(r, g, b) = colour_rgb():
(h,s,v) = colour_hsv()
result = state():
Credits
* TuyaAPI https://github.com/codetheweb/tuyapi by codetheweb and blackrozes
For protocol reverse engineering
* PyTuya https://github.com/clach04/python-tuya by clach04
The origin of this python module (now abandoned)
* LocalTuya https://github.com/rospogrigio/localtuya-homeassistant by rospogrigio
Updated pytuya to support devices with Device IDs of 22 characters
"""
import base64
import base64
from hashlib import md5
from hashlib import md5
import json
import json
import logging
import logging
import socket
import socket
import sys
import sys
import time
import time
import colorsys
import colorsys
import binascii
import binascii
# Required module: pycryptodome
try:
try:
#raise ImportError
import Crypto
import Crypto
from Crypto.Cipher import AES # PyCrypto
from Crypto.Cipher import AES # PyCrypto
except ImportError:
except ImportError:
Crypto = AES = None
Crypto = AES = None
import pyaes # https://github.com/ricmoo/pyaes
import pyaes # https://github.com/ricmoo/pyaes
version_tuple = (1, 0, 0)
version_tuple = (7, 0, 9)
version = __version__ = '%d.%d.%d' % version_tuple
version = version_string = __version__ = '%d.%d.%d' % version_tuple
__author__ = 'jasonacox'
__author__ = 'rospogrigio'
log = logging.getLogger(__name__)
log = logging.getLogger(__name__)
logging.basicConfig() # TODO include function name/line numbers in log
#log.setLevel(level=logging.DEBUG) # Uncomment to Debug
#log.setLevel(level=logging.DEBUG) # Debug hack!
log.debug('%s version %s', __name__, version)
log.debug('%s version %s', __name__, __version__)
log.debug('Python %s on %s', sys.version, sys.platform)
log.debug('Python %s on %s', sys.version, sys.platform)
if Crypto is None:
if Crypto is None:
log.debug('Using pyaes version %r', pyaes.VERSION)
log.debug('Using pyaes version %r', pyaes.VERSION)
log.debug('Using pyaes from %r', pyaes.__file__)
log.debug('Using pyaes from %r', pyaes.__file__)
else:
else:
log.debug('Using PyCrypto %r', Crypto.version_info)
log.debug('Using PyCrypto %r', Crypto.version_info)
log.debug('Using PyCrypto from %r', Crypto.__file__)
log.debug('Using PyCrypto from %r', Crypto.__file__)
SET = 'set'
SET = 'set'
STATUS = 'status'
STATUS = 'status'
PROTOCOL_VERSION_BYTES_31 = b'3.1'
PROTOCOL_VERSION_BYTES_31 = b'3.1'
PROTOCOL_VERSION_BYTES_33 = b'3.3'
PROTOCOL_VERSION_BYTES_33 = b'3.3'
IS_PY2 = sys.version_info[0] == 2
IS_PY2 = sys.version_info[0] == 2
class AESCipher(object):
class AESCipher(object):
def __init__(self, key):
def __init__(self, key):
#self.bs = 32 # 32 work fines for ON, does not work for OFF. Padding different compared to js version https://github.com/codetheweb/tuyapi/
self.bs = 16
self.bs = 16
self.key = key
self.key = key
def encrypt(self, raw, use_base64 = True):
def encrypt(self, raw, use_base64 = True):
if Crypto:
if Crypto:
raw = self._pad(raw)
raw = self._pad(raw)
cipher = AES.new(self.key, mode=AES.MODE_ECB)
cipher = AES.new(self.key, mode=AES.MODE_ECB)
crypted_text = cipher.encrypt(raw)
crypted_text = cipher.encrypt(raw)
else:
else:
_ = self._pad(raw)
_ = self._pad(raw)
cipher = pyaes.blockfeeder.Encrypter(pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16
cipher = pyaes.blockfeeder.Encrypter(pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16
crypted_text = cipher.feed(raw)
crypted_text = cipher.feed(raw)
crypted_text += cipher.feed() # flush final block
crypted_text += cipher.feed() # flush final block
#print('crypted_text %r' % crypted_text)
#print('crypted_text (%d) %r' % (len(crypted_text), crypted_text))
if use_base64:
if use_base64:
return base64.b64encode(crypted_text)
return base64.b64encode(crypted_text)
else:
else:
return crypted_text
return crypted_text
def decrypt(self, enc, use_base64=True):
def decrypt(self, enc, use_base64=True):
if use_base64:
if use_base64:
enc = base64.b64decode(enc)
enc = base64.b64decode(enc)
#print('enc (%d) %r' % (len(enc), enc))
#enc = self._unpad(enc)
#enc = self._pad(enc)
#print('upadenc (%d) %r' % (len(enc), enc))
if Crypto:
if Crypto:
cipher = AES.new(self.key, AES.MODE_ECB)
cipher = AES.new(self.key, AES.MODE_ECB)
raw = cipher.decrypt(enc)
raw = cipher.decrypt(enc)
#print('raw (%d) %r' % (len(raw), raw))
return self._unpad(raw).decode('utf-8')
return self._unpad(raw).decode('utf-8')
#return self._unpad(cipher.decrypt(enc)).decode('utf-8')
else:
else:
cipher = pyaes.blockfeeder.Decrypter(pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16
cipher = pyaes.blockfeeder.Decrypter(pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16
plain_text = cipher.feed(enc)
plain_text = cipher.feed(enc)
plain_text += cipher.feed() # flush final block
plain_text += cipher.feed() # flush final block
return plain_text
return plain_text
def _pad(self, s):
def _pad(self, s):
padnum = self.bs - len(s) % self.bs
padnum = self.bs - len(s) % self.bs
return s + padnum * chr(padnum).encode()
return s + padnum * chr(padnum).encode()
@staticmethod
@staticmethod
def _unpad(s):
def _unpad(s):
return s[:-ord(s[len(s)-1:])]
return s[:-ord(s[len(s)-1:])]
def bin2hex(x, pretty=False):
def bin2hex(x, pretty=False):
if pretty:
if pretty:
space = ' '
space = ' '
else:
else:
space = ''
space = ''
if IS_PY2:
if IS_PY2:
result = ''.join('%02X%s' % (ord(y), space) for y in x)
result = ''.join('%02X%s' % (ord(y), space) for y in x)
else:
else:
result = ''.join('%02X%s' % (y, space) for y in x)
result = ''.join('%02X%s' % (y, space) for y in x)
return result
return result
def hex2bin(x):
def hex2bin(x):
if IS_PY2:
if IS_PY2:
return x.decode('hex')
return x.decode('hex')
else:
else:
return bytes.fromhex(x)
return bytes.fromhex(x)
# This is intended to match requests.json payload at https://github.com/codetheweb/tuyapi
# This is intended to match requests.json payload at https://github.com/codetheweb/tuyapi
# device20 or device22 are to be used depending on the length of dev_id (20 or 22 chars)
# Items device20 and device22 are to be used depending on the length of dev_id (20 or 22 chars)
payload_dict = {
payload_dict = {
"device20": {
"device20": {
"status": {
"status": {
"hexByte": "0a",
"hexByte": "0a",
"command": {"gwId": "", "devId": ""}
"command": {"gwId": "", "devId": ""}
},
},
"set": {
"set": {
"hexByte": "07",
"hexByte": "07",
"command": {"devId": "", "uid": "", "t": ""}
"command": {"devId": "", "uid": "", "t": ""}
},
},
"prefix": "000055aa00000000000000", # Next byte is command byte ("hexByte") some zero padding, then length of remaining payload, i.e. command + suffix (unclear if multiple bytes used for length, zero padding implies could be more than one byte)
"prefix": "000055aa00000000000000", # Next byte is command byte ("hexByte") some zero padding, then length of remaining payload, i.e. command + suffix (unclear if multiple bytes used for length, zero padding implies could be more than one byte)
"suffix": "000000000000aa55"
"suffix": "000000000000aa55"
},
},
"device22": {
"device22": {
"status": {
"status": {
"hexByte": "0d",
"hexByte": "0d",
"command": {"devId": "", "uid": "", "t": ""}
"command": {"devId": "", "uid": "", "t": ""}
},
},
"set": {
"set": {
"hexByte": "07",
"hexByte": "07",
"command": {"devId": "", "uid": "", "t": ""}
"command": {"devId": "", "uid": "", "t": ""}
},
},
"prefix": "000055aa00000000000000", # Next byte is command byte ("hexByte") some zero padding, then length of remaining payload, i.e. command + suffix (unclear if multiple bytes used for length, zero padding implies could be more than one byte)
"prefix": "000055aa00000000000000", # Next byte is command byte ("hexByte") some zero padding, then length of remaining payload, i.e. command + suffix (unclear if multiple bytes used for length, zero padding implies could be more than one byte)
"suffix": "000000000000aa55"
"suffix": "000000000000aa55"
}
}
}
}
class XenonDevice(object):
class XenonDevice(object):
def __init__(self, dev_id, address, local_key=None, connection_timeout=10):
def __init__(self, dev_id, address, local_key=None, connection_timeout=10):
"""
"""
Represents a Tuya device.
Represents a Tuya device.
Args:
Args:
dev_id (str): The device id.
dev_id (str): The device id.
address (str): The network address.
address (str): The network address.
local_key (str, optional): The encryption key. Defaults to None.
local_key (str, optional): The encryption key. Defaults to None.
Attributes:
Attributes:
port (int): The port to connect to.
port (int): The port to connect to.
"""
"""
self.id = dev_id
self.id = dev_id
self.address = address
self.address = address
self.local_key = local_key
self.local_key = local_key
self.local_key = local_key.encode('latin1')
self.local_key = local_key.encode('latin1')
self.connection_timeout = connection_timeout
self.connection_timeout = connection_timeout
self.version = 3.1
self.version = 3.1
if len(dev_id) == 22:
if len(dev_id) == 22:
self.dev_type = 'device22'
self.dev_type = 'device22'
else:
else:
self.dev_type = 'device20'
self.dev_type = 'device20'
self.port = 6668 # default - do not expect caller to pass in
self.port = 6668 # default - do not expect caller to pass in
def __repr__(self):
def __repr__(self):
return '%r' % ((self.id, self.address),) # FIXME can do better than this
return '%r' % ((self.id, self.address),) # FIXME can do better than this
def _send_receive(self, payload):
def _send_receive(self, payload):
"""
"""
Send single buffer `payload` and receive a single buffer.
Send single buffer `payload` and receive a single buffer.
Args:
Args:
payload(bytes): Data to send.
payload(bytes): Data to send.
"""
"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
s.settimeout(self.connection_timeout)
s.settimeout(self.connection_timeout)
s.connect((self.address, self.port))
s.connect((self.address, self.port))
s.send(payload)
except Exception as e:
data = s.recv(1024)
print('Failed to connect to %s. Raising Exception.' % (self.address))
raise e
try:
s.send(payload)
except Exception as e:
print('Failed to send payload to %s. Raising Exception.' % (self.address))
#s.close()
raise e
try:
data = s.recv(1024)
# print("FIRST: Received %d bytes" % len(data) )
# sometimes the first packet does not contain data (typically 28 bytes): need to read again
if len(data) < 40:
time.sleep(0.1)
data = s.recv(1024)
# print("SECOND: Received %d bytes" % len(data) )
except Exception as e:
print('Failed to receive data from %s. Raising Exception.' % (self.address))
#s.close()
raise e
s.close()
s.close()
return data
return data
def set_version(self, version):
def set_version(self, version):
self.version = version
self.version = version
def set_dpsUsed(self, dpsUsed):
def set_dpsUsed(self, dpsUsed):
self.dpsUsed = dpsUsed
self.dpsUsed = dpsUsed
def generate_payload(self, command, data=None):
def generate_payload(self, command, data=None):
"""
"""
Generate the payload to send.
Generate the payload to send.
Args:
Args:
command(str): The type of command.
command(str): The type of command.
This is one of the entries from payload_dict
This is one of the entries from payload_dict
data(dict, optional): The data to be send.
data(dict, optional): The data to be send.
This is what will be passed via the 'dps' entry
This is what will be passed via the 'dps' entry
"""
"""
json_data = payload_dict[self.dev_type][command]['command']
json_data = payload_dict[self.dev_type][command]['command']
command_hb = payload_dict[self.dev_type][command]['hexByte']
command_hb = payload_dict[self.dev_type][command]['hexByte']
if 'gwId' in json_data:
if 'gwId' in json_data:
json_data['gwId'] = self.id
json_data['gwId'] = self.id
if 'devId' in json_data:
if 'devId' in json_data:
json_data['devId'] = self.id
json_data['devId'] = self.id
if 'uid' in json_data:
if 'uid' in json_data:
json_data['uid'] = self.id # still use id, no seperate uid
json_data['uid'] = self.id # still use id, no seperate uid
if 't' in json_data:
if 't' in json_data:
json_data['t'] = str(int(time.time()))
json_data['t'] = str(int(time.time()))
if data is not None:
if data is not None:
json_data['dps'] = data
json_data['dps'] = data
if command_hb == '0d':
if command_hb == '0d':
json_data['dps'] = self.dpsUsed
json_data['dps'] = self.dpsUsed
# log.info('******** COMMAND IS %r', self.dpsUsed)
# Create byte buffer from hex data
# Create byte buffer from hex data
json_payload = json.dumps(json_data)
json_payload = json.dumps(json_data)
#print(json_payload)
json_payload = json_payload.replace(' ', '') # if spaces are not removed device does not respond!
json_payload = json_payload.replace(' ', '') # if spaces are not removed device does not respond!
json_payload = json_payload.encode('utf-8')
json_payload = json_payload.encode('utf-8')
log.debug('json_payload=%r', json_payload)
log.debug('json_payload=%r', json_payload)
#print('json_payload = ', json_payload, ' cmd = ', command_hb)
if self.version == 3.3:
if self.version == 3.3:
self.cipher = AESCipher(self.local_key) # expect to connect and then disconnect to set new
self.cipher = AESCipher(self.local_key) # expect to connect and then disconnect to set new
json_payload = self.cipher.encrypt(json_payload, False)
json_payload = self.cipher.encrypt(json_payload, False)
self.cipher = None
self.cipher = None
if command_hb != '0a':
if command_hb != '0a':
# add the 3.3 header
# add the 3.3 header
json_payload = PROTOCOL_VERSION_BYTES_33 + b"\0\0\0\0\0\0\0\0\0\0\0\0" + json_payload
json_payload = PROTOCOL_VERSION_BYTES_33 + b"\0\0\0\0\0\0\0\0\0\0\0\0" + json_payload
elif command == SET:
elif command == SET:
# need to encrypt
# need to encrypt
#print('json_payload %r' % json_payload)
self.cipher = AESCipher(self.local_key) # expect to connect and then disconnect to set new
self.cipher = AESCipher(self.local_key) # expect to connect and then disconnect to set new
json_payload = self.cipher.encrypt(json_payload)
json_payload = self.cipher.encrypt(json_payload)
#print('crypted json_payload %r' % json_payload)
preMd5String = b'data=' + json_payload + b'||lpv=' + PROTOCOL_VERSION_BYTES_31 + b'||' + self.local_key
preMd5String = b'data=' + json_payload + b'||lpv=' + PROTOCOL_VERSION_BYTES_31 + b'||' + self.local_key
#print('preMd5String %r' % preMd5String)
m = md5()
m = md5()
m.update(preMd5String)
m.update(preMd5String)
#print(repr(m.digest()))
hexdigest = m.hexdigest()
hexdigest = m.hexdigest()
#print(hexdigest)
#print(hexdigest[8:][:16])
json_payload = PROTOCOL_VERSION_BYTES_31 + hexdigest[8:][:16].encode('latin1') + json_payload
json_payload = PROTOCOL_VERSION_BYTES_31 + hexdigest[8:][:16].encode('latin1') + json_payload
#print('data_to_send')
#print(json_payload)
#print('crypted json_payload (%d) %r' % (len(json_payload), json_payload))
#print('json_payload %r' % repr(json_payload))
#print('json_payload len %r' % len(json_payload))
#print(bin2hex(json_payload))
self.cipher = None # expect to connect and then disconnect to set new
self.cipher = None # expect to connect and then disconnect to set new
postfix_payload = hex2bin(bin2hex(json_payload) + payload_dict[self.dev_type]['suffix'])
postfix_payload = hex2bin(bin2hex(json_payload) + payload_dict[self.dev_type]['suffix'])
#print('postfix_payload %r' % postfix_payload)
#print('postfix_payload %r' % len(postfix_payload))
#print('postfix_payload %x' % len(postfix_payload))
#print('postfix_payload %r' % hex(len(postfix_payload)))
assert len(postfix_payload) <= 0xff
assert len(postfix_payload) <= 0xff
postfix_payload_hex_len = '%x' % len(postfix_payload) # TODO this assumes a single byte 0-255 (0x00-0xff)
postfix_payload_hex_len = '%x' % len(postfix_payload) # single byte 0-255 (0x00-0xff)
buffer = hex2bin( payload_dict[self.dev_type]['prefix'] +
buffer = hex2bin( payload_dict[self.dev_type]['prefix'] +
payload_dict[self.dev_type][command]['hexByte'] +
payload_dict[self.dev_type][command]['hexByte'] +
'000000' +
'000000' +
postfix_payload_hex_len ) + postfix_payload
postfix_payload_hex_len ) + postfix_payload
# calc the CRC of everything except where the CRC goes and the suffix
# calc the CRC of everything except where the CRC goes and the suffix
hex_crc = format(binascii.crc32(buffer[:-8]) & 0xffffffff, '08X')
hex_crc = format(binascii.crc32(buffer[:-8]) & 0xffffffff, '08X')
buffer = buffer[:-8] + hex2bin(hex_crc) + buffer[-4:]
buffer = buffer[:-8] + hex2bin(hex_crc) + buffer[-4:]
#print('command', command)
#print('prefix')
#print(payload_dict[self.dev_type][command]['prefix'])
#print(repr(buffer))
#print(bin2hex(buffer, pretty=False))
#print('full buffer(%d) %r' % (len(buffer), bin2hex(buffer, pretty=True) ))
#print('full buffer(%d) %r' % (len(buffer), " ".join("{:02x}".format(ord(c)) for c in buffer)))
return buffer
return buffer
class Device(XenonDevice):
class Device(XenonDevice):
def __init__(self, dev_id, address, local_key=None, dev_type=None):
def __init__(self, dev_id, address, local_key=None, dev_type=None):
super(Device, self).__init__(dev_id, address, local_key, dev_type)
super(Device, self).__init__(dev_id, address, local_key, dev_type)
def status(self):
def status(self):
log.debug('status() entry (dev_type is %s)', self.dev_type)
log.debug('status() entry (dev_type is %s)', self.dev_type)
# open device, send request, then close connection
# open device, send request, then close connection
payload = self.generate_payload('status')
payload = self.generate_payload('status')
data = self._send_receive(payload)
data = self._send_receive(payload)
log.debug('status received data=%r', data)
log.debug('status received data=%r', data)
result = data[20:-8] # hard coded offsets
result = data[20:-8] # hard coded offsets
if self.dev_type != 'device20':
if self.dev_type != 'device20':
result = result[15:]
result = result[15:]
log.debug('result=%r', result)
log.debug('result=%r', result)
#result = data[data.find('{'):data.rfind('}')+1] # naive marker search, hope neither { nor } occur in header/footer
#print('result %r' % result)
if result.startswith(b'{'):
if result.startswith(b'{'):
# this is the regular expected code path
# this is the regular expected code path
if not isinstance(result, str):
if not isinstance(result, str):
result = result.decode()
result = result.decode()
result = json.loads(result)
result = json.loads(result)
elif result.startswith(PROTOCOL_VERSION_BYTES_31):
elif result.startswith(PROTOCOL_VERSION_BYTES_31):
# got an encrypted payload, happens occasionally
# got an encrypted payload, happens occasionally
# expect resulting json to look similar to:: {"devId":"ID","dps":{"1":true,"2":0},"t":EPOCH_SECS,"s":3_DIGIT_NUM}
# expect resulting json to look similar to:: {"devId":"ID","dps":{"1":true,"2":0},"t":EPOCH_SECS,"s":3_DIGIT_NUM}
# NOTE dps.2 may or may not be present
# NOTE dps.2 may or may not be present
result = result[len(PROTOCOL_VERSION_BYTES_31):] # remove version header
result = result[len(PROTOCOL_VERSION_BYTES_31):] # remove version header
result = result[16:] # remove (what I'm guessing, but not confirmed is) 16-bytes of MD5 hexdigest of payload
result = result[16:] # Remove 16-bytes appears to be MD5 hexdigest of payload
cipher = AESCipher(self.local_key)
cipher = AESCipher(self.local_key)
result = cipher.decrypt(result)
result = cipher.decrypt(result)
log.debug('decrypted result=%r', result)
log.debug('decrypted result=%r', result)
if not isinstance(result, str):
if not isinstance(result, str):
result = result.decode()
result = result.decode()
result = json.loads(result)
result = json.loads(result)
elif self.version == 3.3:
elif self.version == 3.3:
cipher = AESCipher(self.local_key)
cipher = AESCipher(self.local_key)
result = cipher.decrypt(result, False)
result = cipher.decrypt(result, False)
log.debug('decrypted result=%r', result)
log.debug('decrypted result=%r', result)
if not isinstance(result, str):
if not isinstance(result, str):
result = result.decode()
result = result.decode()
result = json.loads(result)
result = json.loads(result)
else:
else:
log.error('Unexpected status() payload=%r', result)
log.error('Unexpected status() payload=%r', result)
# if self.dev_type == 'cover':
# result = result.encode('utf-8')
# log.debug('encoded result=%r', result)
return result
return result
def set_status(self, on, switch=1):
def set_status(self, on, switch=1):
"""
"""
Set status of the device to 'on' or 'off'.
Set status of the device to 'on' or 'off'.
Args:
Args:
on(bool): True for 'on', False for 'off'.
on(bool): True for 'on', False for 'off'.
switch(int): The switch to set
switch(int): The switch to set
"""
"""
# open device, send request, then close connection
# open device, send request, then close connection
if isinstance(switch, int):
if isinstance(switch, int):
switch = str(switch) # index and payload is a string
switch = str(switch) # index and payload is a string
payload = self.generate_payload(SET, {switch:on})
payload = self.generate_payload(SET, {switch:on})
#print('payload %r' % payload)
data = self._send_receive(payload)
data = self._send_receive(payload)
log.debug('set_status received data=%r', data)
log.debug('set_status received data=%r', data)
return data
return data
def set_value(self, index, value):
def set_value(self, index, value):
"""
"""
Set int value of any index.
Set int value of any index.
Args:
Args:
index(int): index to set
index(int): index to set
value(int): new value for the index
value(int): new value for the index
"""
"""
# open device, send request, then close connection
# open device, send request, then close connection
if isinstance(index, int):
if isinstance(index, int):
index = str(index) # index and payload is a string
index = str(index) # index and payload is a string
payload = self.generate_payload(SET, {
payload = self.generate_payload(SET, {
index: value})
index: value})
data = self._send_receive(payload)
data = self._send_receive(payload)
return data
return data
def turn_on(self, switch=1):
def turn_on(self, switch=1):
"""Turn the device on"""
"""Turn the device on"""
self.set_status(True, switch)
self.set_status(True, switch)
def turn_off(self, switch=1):
def turn_off(self, switch=1):
"""Turn the device off"""
"""Turn the device off"""
self.set_status(False, switch)
self.set_status(False, switch)
def set_timer(self, num_secs):
def set_timer(self, num_secs):
"""
"""
Set a timer.
Set a timer.
Args:
Args:
num_secs(int): Number of seconds
num_secs(int): Number of seconds
"""
"""
# FIXME / TODO support schemas? Accept timer id number as parameter?
# Dumb heuristic; Query status, pick last device id as that is probably the timer
# Query status, pick last device id as that is probably the timer
status = self.status()
status = self.status()
devices = status['dps']
devices = status['dps']
devices_numbers = list(devices.keys())
devices_numbers = list(devices.keys())
devices_numbers.sort()
devices_numbers.sort()
dps_id = devices_numbers[-1]
dps_id = devices_numbers[-1]
payload = self.generate_payload(SET, {dps_id:num_secs})
payload = self.generate_payload(SET, {dps_id:num_secs})
data = self._send_receive(payload)
data = self._send_receive(payload)
log.debug('set_timer received data=%r', data)
log.debug('set_timer received data=%r', data)
return data
return data
class OutletDevice(Device):
class OutletDevice(Device):
"""
Represents a Tuya based Smart Plug or Switch.
Args:
dev_id (str): The device id.
address (str): The network address.
local_key (str, optional): The encryption key. Defaults to None.
"""
def __init__(self, dev_id, address, local_key=None):
def __init__(self, dev_id, address, local_key=None):
super(OutletDevice, self).__init__(dev_id, address, local_key)
super(OutletDevice, self).__init__(dev_id, address, local_key)
class FanDevice(Device):
class CoverDevice(Device):
DPS_INDEX_SPEED = '2'
"""
Represents a Tuya based Smart Powered Windows Cover.
def __init__(self, dev_id, address, local_key=None):
super(FanDevice, self).__init__(dev_id, address, local_key)
Args:
dev_id (str): The device id.
class CoverEntity(Device):
address (str): The network address.
local_key (str, optional): The encryption key. Defaults to None.
"""
DPS_INDEX_MOVE = '1'
DPS_INDEX_MOVE = '1'
DPS_INDEX_BL = '101'
DPS_INDEX_BL = '101'
DPS_2_STATE = {
DPS_2_STATE = {
'1':'movement',
'1':'movement',
'101':'backlight',
'101':'backlight',
}
}
def __init__(self, dev_id, address, local_key=None):
def __init__(self, dev_id, address, local_key=None):
print('%s version %s' % ( __name__, version))
print('%s version %s' % ( __name__, version))
print('Python %s on %s' % (sys.version, sys.platform))
print('Python %s on %s' % (sys.version, sys.platform))
if Crypto is None:
if Crypto is None:
print('Using pyaes version ', pyaes.VERSION)
print('Using pyaes version ', pyaes.VERSION)
print('Using pyaes from ', pyaes.__file__)
print('Using pyaes from ', pyaes.__file__)
else:
else:
print('Using PyCrypto ', Crypto.version_info)
print('Using PyCrypto ', Crypto.version_info)
print('Using PyCrypto from ', Crypto.__file__)
print('Using PyCrypto from ', Crypto.__file__)
super(CoverEntity, self).__init__(dev_id, address, local_key)
super(CoverDevice, self).__init__(dev_id, address, local_key)
def open_cover(self, switch=1):
def open_cover(self, switch=1):
"""Turn the device on"""
"""Open the cover"""
self.set_status('on', switch)
self.set_status('on', switch)
def close_cover(self, switch=1):
def close_cover(self, switch=1):
"""Turn the device off"""
"""Close the cover"""
self.set_status('off', switch)
self.set_status('off', switch)
def stop_cover(self, switch=1):
def stop_cover(self, switch=1):
"""Turn the device off"""
"""Stop the motion of the cover"""
self.set_status('stop', switch)
self.set_status('stop', switch)
class BulbDevice(Device):
class BulbDevice(Device):
"""
Represents a Tuya based Smart Light/Bulb.
Args:
dev_id (str): The device id.
address (str): The network address.
local_key (str, optional): The encryption key. Defaults to None.
"""
DPS_INDEX_ON = '1'
DPS_INDEX_ON = '1'
DPS_INDEX_MODE = '2'
DPS_INDEX_MODE = '2'
DPS_INDEX_BRIGHTNESS = '3'
DPS_INDEX_BRIGHTNESS = '3'
DPS_INDEX_COLOURTEMP = '4'
DPS_INDEX_COLOURTEMP = '4'
DPS_INDEX_COLOUR = '5'
DPS_INDEX_COLOUR = '5'
DPS = 'dps'
DPS = 'dps'
DPS_MODE_COLOUR = 'colour'
DPS_MODE_COLOUR = 'colour'
DPS_MODE_WHITE = 'white'
DPS_MODE_WHITE = 'white'
DPS_2_STATE = {
DPS_2_STATE = {
'1':'is_on',
'1':'is_on',
'2':'mode',
'2':'mode',
'3':'brightness',
'3':'brightness',
'4':'colourtemp',
'4':'colourtemp',
'5':'colour',
'5':'colour',
}
}
def __init__(self, dev_id, address, local_key=None):
def __init__(self, dev_id, address, local_key=None):
super(BulbDevice, self).__init__(dev_id, address, local_key)
super(BulbDevice, self).__init__(dev_id, address, local_key)
@staticmethod
@staticmethod
def _rgb_to_hexvalue(r, g, b):
def _rgb_to_hexvalue(r, g, b):
"""
"""
Convert an RGB value to the hex representation expected by tuya.
Convert an RGB value to the hex representation expected by tuya.
Index '5' (DPS_INDEX_COLOUR) is assumed to be in the format:
Index '5' (DPS_INDEX_COLOUR) is assumed to be in the format:
rrggbb0hhhssvv
rrggbb0hhhssvv
While r, g and b are just hexadecimal values of the corresponding
While r, g and b are just hexadecimal values of the corresponding
Red, Green and Blue values, the h, s and v values (which are values
Red, Green and Blue values, the h, s and v values (which are values
between 0 and 1) are scaled to 360 (h) and 255 (s and v) respectively.
between 0 and 1) are scaled to 360 (h) and 255 (s and v) respectively.
Args:
Args:
r(int): Value for the colour red as int from 0-255.
r(int): Value for the colour red as int from 0-255.
g(int): Value for the colour green as int from 0-255.
g(int): Value for the colour green as int from 0-255.
b(int): Value for the colour blue as int from 0-255.
b(int): Value for the colour blue as int from 0-255.
"""
"""
rgb = [r,g,b]
rgb = [r,g,b]
hsv = colorsys.rgb_to_hsv(rgb[0]/255, rgb[1]/255, rgb[2]/255)
hsv = colorsys.rgb_to_hsv(rgb[0]/255, rgb[1]/255, rgb[2]/255)
hexvalue = ""
hexvalue = ""
for value in rgb:
for value in rgb:
temp = str(hex(int(value))).replace("0x","")
temp = str(hex(int(value))).replace("0x","")
if len(temp) == 1:
if len(temp) == 1:
temp = "0" + temp
temp = "0" + temp
hexvalue = hexvalue + temp
hexvalue = hexvalue + temp
hsvarray = [int(hsv[0] * 360), int(hsv[1] * 255), int(hsv[2] * 255)]
hsvarray = [int(hsv[0] * 360), int(hsv[1] * 255), int(hsv[2] * 255)]
hexvalue_hsv = ""
hexvalue_hsv = ""
for value in hsvarray:
for value in hsvarray:
temp = str(hex(int(value))).replace("0x","")
temp = str(hex(int(value))).replace("0x","")
if len(temp) == 1:
if len(temp) == 1:
temp = "0" + temp
temp = "0" + temp
hexvalue_hsv = hexvalue_hsv + temp
hexvalue_hsv = hexvalue_hsv + temp
if len(hexvalue_hsv) == 7:
if len(hexvalue_hsv) == 7:
hexvalue = hexvalue + "0" + hexvalue_hsv
hexvalue = hexvalue + "0" + hexvalue_hsv
else:
else:
hexvalue = hexvalue + "00" + hexvalue_hsv
hexvalue = hexvalue + "00" + hexvalue_hsv
return hexvalue
return hexvalue
@staticmethod
@staticmethod
def _hexvalue_to_rgb(hexvalue):
def _hexvalue_to_rgb(hexvalue):
"""
"""
Converts the hexvalue used by tuya for colour representation into
Converts the hexvalue used by Tuya for colour representation into
an RGB value.
an RGB value.
Args:
Args:
hexvalue(string): The hex representation generated by BulbDevice._rgb_to_hexvalue()
hexvalue(string): The hex representation generated by BulbDevice._rgb_to_hexvalue()
"""
"""
r = int(hexvalue[0:2], 16)
r = int(hexvalue[0:2], 16)
g = int(hexvalue[2:4], 16)
g = int(hexvalue[2:4], 16)
b = int(hexvalue[4:6], 16)
b = int(hexvalue[4:6], 16)
return (r, g, b)
return (r, g, b)
@staticmethod
@staticmethod
def _hexvalue_to_hsv(hexvalue):
def _hexvalue_to_hsv(hexvalue):
"""
"""
Converts the hexvalue used by tuya for colour representation into
Converts the hexvalue used by Tuya for colour representation into
an HSV value.
an HSV value.
Args:
Args:
hexvalue(string): The hex representation generated by BulbDevice._rgb_to_hexvalue()
hexvalue(string): The hex representation generated by BulbDevice._rgb_to_hexvalue()
"""
"""
h = int(hexvalue[7:10], 16) / 360
h = int(hexvalue[7:10], 16) / 360
s = int(hexvalue[10:12], 16) / 255
s = int(hexvalue[10:12], 16) / 255
v = int(hexvalue[12:14], 16) / 255
v = int(hexvalue[12:14], 16) / 255
return (h, s, v)
return (h, s, v)
def set_colour(self, r, g, b):
def set_colour(self, r, g, b):
"""
"""
Set colour of an rgb bulb.
Set colour of an rgb bulb.
Args:
Args:
r(int): Value for the colour red as int from 0-255.
r(int): Value for the colour red as int from 0-255.
g(int): Value for the colour green as int from 0-255.
g(int): Value for the colour green as int from 0-255.
b(int): Value for the colour blue as int from 0-255.
b(int): Value for the colour blue as int from 0-255.
"""
"""
if not 0 <= r <= 255:
if not 0 <= r <= 255:
raise ValueError("The value for red needs to be between 0 and 255.")
raise ValueError("The value for red needs to be between 0 and 255.")
if not 0 <= g <= 255:
if not 0 <= g <= 255:
raise ValueError("The value for green needs to be between 0 and 255.")
raise ValueError("The value for green needs to be between 0 and 255.")
if not 0 <= b <= 255:
if not 0 <= b <= 255:
raise ValueError("The value for blue needs to be between 0 and 255.")
raise ValueError("The value for blue needs to be between 0 and 255.")
#print(BulbDevice)
#print(BulbDevice)
hexvalue = BulbDevice._rgb_to_hexvalue(r, g, b)
hexvalue = BulbDevice._rgb_to_hexvalue(r, g, b)
payload = self.generate_payload(SET, {
payload = self.generate_payload(SET, {
self.DPS_INDEX_MODE: self.DPS_MODE_COLOUR,
self.DPS_INDEX_MODE: self.DPS_MODE_COLOUR,
self.DPS_INDEX_COLOUR: hexvalue})
self.DPS_INDEX_COLOUR: hexvalue})
data = self._send_receive(payload)
data = self._send_receive(payload)
return data
return data
def set_white(self, brightness, colourtemp):
def set_white(self, brightness, colourtemp):
"""
"""
Set white coloured theme of an rgb bulb.
Set white coloured theme of an rgb bulb.
Args:
Args:
brightness(int): Value for the brightness (25-255).
brightness(int): Value for the brightness (25-255).
colourtemp(int): Value for the colour temperature (0-255).
colourtemp(int): Value for the colour temperature (0-255).
"""
"""
if not 25 <= brightness <= 255:
if not 25 <= brightness <= 255:
raise ValueError("The brightness needs to be between 25 and 255.")
raise ValueError("The brightness needs to be between 25 and 255.")
if not 0 <= colourtemp <= 255:
if not 0 <= colourtemp <= 255:
raise ValueError("The colour temperature needs to be between 0 and 255.")
raise ValueError("The colour temperature needs to be between 0 and 255.")
payload = self.generate_payload(SET, {
payload = self.generate_payload(SET, {
self.DPS_INDEX_MODE: self.DPS_MODE_WHITE,
self.DPS_INDEX_MODE: self.DPS_MODE_WHITE,
self.DPS_INDEX_BRIGHTNESS: brightness,
self.DPS_INDEX_BRIGHTNESS: brightness,
self.DPS_INDEX_COLOURTEMP: colourtemp})
self.DPS_INDEX_COLOURTEMP: colourtemp})
data = self._send_receive(payload)
data = self._send_receive(payload)
return data
return data
def set_brightness(self, brightness):
def set_brightness(self, brightness):
"""
"""
Set the brightness value of an rgb bulb.
Set the brightness value of an rgb bulb.
Args:
Args:
brightness(int): Value for the brightness (25-255).
brightness(int): Value for the brightness (25-255).
"""
"""
if not 25 <= brightness <= 255:
if not 25 <= brightness <= 255:
raise ValueError("The brightness needs to be between 25 and 255.")
raise ValueError("The brightness needs to be between 25 and 255.")
payload = self.generate_payload(SET, {self.DPS_INDEX_BRIGHTNESS: brightness})
payload = self.generate_payload(SET, {self.DPS_INDEX_BRIGHTNESS: brightness})
data = self._send_receive(payload)
data = self._send_receive(payload)
return data
return data
def set_colourtemp(self, colourtemp):
def set_colourtemp(self, colourtemp):
"""
"""
Set the colour temperature of an rgb bulb.
Set the colour temperature of an rgb bulb.
Args:
Args:
colourtemp(int): Value for the colour temperature (0-255).
colourtemp(int): Value for the colour temperature (0-255).
"""
"""
if not 0 <= colourtemp <= 255:
if not 0 <= colourtemp <= 255:
raise ValueError("The colour temperature needs to be between 0 and 255.")
raise ValueError("The colour temperature needs to be between 0 and 255.")
payload = self.generate_payload(SET, {self.DPS_INDEX_COLOURTEMP: colourtemp})
payload = self.generate_payload(SET, {self.DPS_INDEX_COLOURTEMP: colourtemp})
data = self._send_receive(payload)
data = self._send_receive(payload)
return data
return data
def brightness(self):
def brightness(self):
"""Return brightness value"""
"""Return brightness value"""
return self.status()[self.DPS][self.DPS_INDEX_BRIGHTNESS]
return self.status()[self.DPS][self.DPS_INDEX_BRIGHTNESS]
def colourtemp(self):
def colourtemp(self):
"""Return colour temperature"""
"""Return colour temperature"""
return self.status()[self.DPS][self.DPS_INDEX_COLOURTEMP]
return self.status()[self.DPS][self.DPS_INDEX_COLOURTEMP]
def colour_rgb(self):
def colour_rgb(self):
"""Return colour as RGB value"""
"""Return colour as RGB value"""
hexvalue = self.status()[self.DPS][self.DPS_INDEX_COLOUR]
hexvalue = self.status()[self.DPS][self.DPS_INDEX_COLOUR]
return BulbDevice._hexvalue_to_rgb(hexvalue)
return BulbDevice._hexvalue_to_rgb(hexvalue)
def colour_hsv(self):
def colour_hsv(self):
"""Return colour as HSV value"""
"""Return colour as HSV value"""
hexvalue = self.status()[self.DPS][self.DPS_INDEX_COLOUR]
hexvalue = self.status()[self.DPS][self.DPS_INDEX_COLOUR]
return BulbDevice._hexvalue_to_hsv(hexvalue)
return BulbDevice._hexvalue_to_hsv(hexvalue)
def state(self):
def state(self):
"""Return state of Bulb"""
status = self.status()
status = self.status()
state = {}
state = {}
for key in status[self.DPS].keys():
for key in status[self.DPS].keys():
if(int(key)<=5):
if(int(key)<=5):
state[self.DPS_2_STATE[key]]=status[self.DPS][key]
state[self.DPS_2_STATE[key]]=status[self.DPS][key]
return state
return state