-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Encryption #2
Labels
Comments
BugBlue has donated some code that does the encryption. Someone[TM] should merge that with our code. class SSP:
SSP_RESET="\x01"
SSP_INHIBIT="\x02"
SSP_DISPLAYON="\x03"
SSP_DISPLAYOFF="\x04"
SSP_SETUP="\x05"
SSP_PROTO="\x06"
SSP_POLL="\x07"
SSP_DISABLE="\x09"
SSP_ENABLE="\x0A"
SSP_LEN="\x0B"
SSP_SERIAL="\x0C"
SSP_SYNC="\x11"
SSP_FIRMWARE="\x20"
SSP_DATASET="\x21"
SSP_SETGENERATOR="\x4A"
SSP_SETMODULUS="\x4B"
SSP_KEYEXCHANGE="\x4C"
debug=1
enable=0
fixedkey=81985526925837671
def upcrc(self,data):
crc= 0xFFFF
for i in range(len(data)):
crc= crc ^ (ord(data[i]) <<8 )
crc = crc & 0xFFFF
for j in range(8):
if (crc & 0x8000 ) == 0x8000:
crc = ((crc << 1) ^ 0x8005)
else:
crc = (crc << 1)
crc = crc & 0xffff
return crc;
def __init__(self, port):
ser = serial.Serial(
port=port,
baudrate=115200,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=2
)
self.ser=ser
def checkcrc(self,msg):
crc=self.upcrc(msg[:-2])
if (crc & 0xFF) == ord(msg[-2:-1]) and ((crc >> 8) & 0xFF) == ord(msg[-1:]):
if self.debug:
logging.debug("CRC correct")
return 1
return 0
def addcrc(self,msg):
crc=self.upcrc(msg)
return msg+chr(crc & 0xFF)+chr((crc >> 8) & 0xFF)
def message(self,msg,override=0):
if self.encryption==1 and override==0:
encmsg=self.encryptmessage(msg);
self.message(encmsg,1)
return
l=len(msg)
newmsg="\x00"+chr(l)+msg;
newmsg=self.addcrc(newmsg)
outmsg="\x7F"
for i in range(0,len(newmsg)):
if newmsg[i]=='\x7F':
outmsg=outmsg+"\x7F\x7F"
else:
outmsg=outmsg+newmsg[i]
if self.debug:
deb=""
for i in range(len(outmsg)):
deb=deb+format("%02X"%ord(outmsg[i]))+" "
logging.debug("Sent: %s",deb)
self.ser.write(outmsg)
def getrealmsg(self,l):
msg=""
escape=0
while l>0:
c=self.ser.read(1)
if c=='\x7F' and escape==0:
escape=1
else:
msg=msg+c
escape=0
l=l-1
return msg
def getmsg(self):
self.recvmsg=""
c=self.ser.read(1)
if c!='\x7f':
logging.debug("ERROR, not start of a message: %02X",ord(c))
return 0
else:
slaveid=ord(self.ser.read(1))
length=ord(self.ser.read(1))
msg=self.getrealmsg(length)
chk=self.ser.read(2)
if self.debug:
logging.debug("Receive: %02X %02X %02X %s %s", ord(c),slaveid,length," ".join(format("%02X"%ord(n)) for n in msg)," ".join(format("%02X"%ord(n)) for n in chk))
if self.checkcrc(chr(slaveid)+chr(length)+msg+chk):
if msg[0]=="\xF0":
self.recvmsg=msg[1:]
return 1
elif msg[0]=="\x7E":
logging.debug("Encrypted received");
dmsg=self.decryptmsg(msg)
logging.debug("Decrypted message: %s"," ".join(format("%02X"%ord(n)) for n in dmsg))
if dmsg[0]=="\xF0":
self.recvmsg=dmsg[1:]
return 1
else:
logging.debug("ERROR message: %s"," ".join(format("%02X"%ord(n)) for n in msg))
self.recvmsg=msg
return 0
else:
logging.debug("ERROR message: %s"," ".join(format("%02X"%ord(n)) for n in msg))
self.recvmsg=msg
return 0
return 0
def dumpmymsg(self,msg):
print "Message: "," ".join(format("%02X"%ord(n)) for n in msg)
def dumpmsg(self):
print "Message: "," ".join(format("%02X"%ord(n)) for n in self.recvmsg)
def cmd_reset(self):
self.message(self.SSP_RESET)
time.sleep(5)
def cmd_serial(self):
self.message(self.SSP_SERIAL)
return self.getmsg()
def cmd_sync(self):
self.message(self.SSP_SYNC)
return self.getmsg()
def cmd_len(self,extra):
self.message(self.SSP_LEN+""+extra)
return self.getmsg()
def cmd_setup(self):
self.message(self.SSP_SETUP)
return self.getmsg()
def cmd_firmware(self):
self.message(self.SSP_FIRMWARE)
return self.getmsg()
def cmd_serial(self):
self.message(self.SSP_SERIAL)
return self.getmsg()
def cmd_dataset(self):
self.message(self.SSP_DATASET)
return self.getmsg()
def cmd_proto(self,proto):
self.message(self.SSP_PROTO+proto)
return self.getmsg()
def cmd_inhibit(self):
self.message(self.SSP_INHIBIT+"\xFF\xFF")
return self.getmsg()
def cmd_exhibit(self):
self.message(self.SSP_INHIBIT+"\x00\x00")
return self.getmsg()
def cmd_enable(self):
self.message(self.SSP_ENABLE)
self.enable=1
return self.getmsg()
def cmd_disable(self):
self.message(self.SSP_DISABLE)
self.enable=0
return self.getmsg()
def cmd_poll(self):
self.message(self.SSP_POLL)
return self.getmsg()
def cmd_displayon(self):
self.message(self.SSP_DISPLAYON)
return self.getmsg()
def cmd_displayoff(self):
self.message(self.SSP_DISPLAYOFF)
return self.getmsg()
def cmd_setgenerator(self):
self.prime=Crypto.Util.number.getPrime(64)
self.message(self.SSP_SETGENERATOR+pack("<Q",self.prime))
return self.getmsg()
def cmd_setmodulus(self):
self.modulus=Crypto.Util.number.getPrime(64)
self.message(self.SSP_SETMODULUS+pack("<Q",self.modulus))
return self.getmsg()
def setrnd(self):
self.random=Crypto.Util.number.getRandomInteger(12)
self.random=8
def cmd_keyex(self):
self.hostinterkey=pow(self.prime,self.random) % self.modulus
self.message(self.SSP_KEYEXCHANGE+pack("<Q",self.hostinterkey))
return self.getmsg()
def slaveinterkey(self):
self.slaveinterkey=unpack("<Q",self.recvmsg)[0]
self.key=pow(self.slaveinterkey,self.random) % self.modulus
print "KEY: ",self.key
def encryptmessage(self,msg):
totallen=1+4+len(msg)+2
needed=int(math.ceil(totallen/16.0)*16 )
topack=needed-totallen
eLENGTH=needed-4-2
eCOUNT=self.ecount
eDATA=msg
ePACK=''.zfill(topack)
msg=self.addcrc(pack("<BI",len(eDATA),eCOUNT)+eDATA+ePACK)
edata=self.cipher.encrypt(msg)
self.ecount+=1
return "\x7E"+edata
def decryptmsg(self,msg):
newmsg=self.cipher.decrypt(msg[1:])
if self.checkcrc(newmsg):
reallen,self.count,newmsg,crc=unpack("<BI%ds2s" % (len(newmsg)-7),newmsg)
return newmsg[0:reallen]
else:
return 0
def setupencryption(self):
if not self.cmd_sync(): return 0
if not self.cmd_setgenerator(): return 0
if not self.cmd_sync(): return 0
if not self.cmd_setmodulus(): return 0
if not self.cmd_sync(): return 0
self.setrnd()
if not self.cmd_keyex(): return 0
self.slaveinterkey()
if not self.cmd_sync(): return 0
self.encryption=1
self.ecount=0
self.iv = 16 * '\x00' #Random.new().read(AES.block_size)
self.cipher = AES.new(pack("<QQ",self.fixedkey,self.key), AES.MODE_ECB, pack("<QQ",self.fixedkey,self.key) )
def start(self):
self.encryption=0
if not self.cmd_sync(): return
for i in range(1,10):
if not self.cmd_proto(chr(i)): break
self.proto=i
if not self.cmd_sync(): return
print "Protocol version:",self.proto
self.setupencryption()
if not self.cmd_firmware(): return
print "Firmware: "+self.recvmsg
if not self.cmd_sync(): return
if not self.cmd_dataset(): return
print "Dataset: "+self.recvmsg
if not self.cmd_sync(): return
if not self.cmd_serial(): return
self.serial=unpack("L",self.recvmsg)[0]
print "Serial: ",self.serial
if not self.cmd_sync(): return |
pc-coholic
pushed a commit
that referenced
this issue
Jun 22, 2017
Support setting a timeout on the serial connection, and raise an error
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Bisher sind keine Routinen zur Verschlüsselung implementiert.
The text was updated successfully, but these errors were encountered: