Skip to content

Commit

Permalink
Merge pull request #17 from Matej4545/jr/initial_tpm2_util
Browse files Browse the repository at this point in the history
Merge tpm2 util
  • Loading branch information
xrosch authored Apr 24, 2021
2 parents 401f2a3 + bec090a commit 56e3a8d
Show file tree
Hide file tree
Showing 5 changed files with 357 additions and 21 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,8 @@ Windows: `./venv/Scripts/activate`
Linux: `source venv/bin/activate`

3. install requirements
`pip install -r requirements.txt`
`pip install -r requirements.txt`

#### Note on usage of physical TPM on Linux
It is possible that using physical TPM on Linux will require you to set your access permissions accordingly.
More on this issue [here](https://superuser.com/questions/1463364/accessing-trusted-platform-moduletpm-without-root-permission).
19 changes: 0 additions & 19 deletions ms_tpm2_test.py

This file was deleted.

2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
cryptography==2.8
cryptography==3.4.7
noiseprotocol==0.3.1
flask==1.1.2
jsonpickle==2.0.0
113 changes: 113 additions & 0 deletions tpm2_demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import TPM2.Tpm as Tpm2
import tpm2_util as tu
from TPM2.Crypt import Crypto
from typing import List


def helper_bitmap_to_pcr_nums(bitmap: List[int]) -> List[int]:
"""Get PCR numbers as in bitmap."""
pcr_nums = []
for j, byte in enumerate(bitmap):
for i in range(8):
if (byte >> i) & 1:
pcr_nums.append(8 * j + i)
return pcr_nums


def tpm2_demo():
"""A TPM2 demo showcasing some of its functionalities.
Uses a real TPM by default.
"""
# host="127.0.0.1", port=2321
# set 'useSimulator' to True to use a TCP TPM. In that case a simulator needs to be running
# use a real TPM when set to False
tpm = Tpm2.Tpm(useSimulator=False)

# TPM object needs to connect to a real TPM or a simulator
try:
tpm.connect()
except Exception as e:
print(e)
return

# get random bytes from TPM (this is limited by the size of the largest hash function, e.g., SHA256)
tpm_rnd_bytes = tu.get_random_bytes(tpm, 10)
print(f"{len(tpm_rnd_bytes)} random bytes:", tpm_rnd_bytes)

# get year of the TPM
print("Year:", tu.get_property_year(tpm))

# get string defined by the vendor
print("Vendor string 1:", tu.get_property_vendor_string(tpm))

# get the minimum of bitmap bytes for PCR selection, usually TPMs have 24 PCRs, i.e., 3 bytes
print("PCR min select:", tu.get_pcr_min_select(tpm))

# get plaintext SHA1 PCR values, optionally specify a selection bitmap, e.g. 0b00001111 for PCR0-3
pcr_bitmap = [0b11111111, 0b00111100, 0b11000011]
print("\nRequested PCR SHA1 hashes:")
plaintext_pcr = tu.get_pcr_values(tpm, pcr_bitmap)
pcr_nums = helper_bitmap_to_pcr_nums(pcr_bitmap)
for pcr_num, pcr_val in zip(pcr_nums, plaintext_pcr):
print(f"PCR{pcr_num:2}:", pcr_val.hex())

if len(plaintext_pcr) == 0:
print("No PCRs were selected or TPM returned empty buffer.")
return

# quote PCR values to securely get their digest, signature, and public key
# the signature's validity is checked
# the signature is also randomized with a nonce
# this can be used for a remote attestation
nonce = Crypto.randomBytes(20)
signed_pcr = tu.get_signed_pcr_values(tpm, nonce, pcr_bitmap)
if signed_pcr is None:
print("Oh no! Something malicious has happened!")
return

data, (ec_x, ec_y), (sig_r, sig_s) = signed_pcr
print("\nQuoted digest from TPM: ", data[-32:].hex())

# PCR digest comparison
# we may check the quoted digest ourselves
hs = Crypto.tpmAlgToPy(Tpm2.TPM_ALG_ID.SHA256)()
for pcr_val in plaintext_pcr:
hs.update(pcr_val)
print("Our recalculated digest:", hs.hexdigest())

print("\nECDSA (NIST-P256) public key: x = ", ec_x.hex(), ", y = ", ec_y.hex(), sep="")
print("Signature: r = ", sig_r.hex(), ", s = ", sig_s.hex(), sep="")
print(f"{len(nonce)} bytes large nonce: {nonce.hex()}, do not reuse :)")

# we may also validate the signature without TPM using cryptography.hazmat
print("\nSignature verification using hazmat:", end=" ")
if tu.ecdsa_validate(ec_x, ec_y, sig_r, sig_s, data):
print("OK")
else:
print("FAIL")

# the 'data' obtained from Quote contains quite a few values
print("\nRaw signed data:", data.hex())

# the binary data are hard to read, use AttestData for an easy access
attested_data = tu.AttestData(data)
print("\nSome values contained in signed data:")
print("Magic:", attested_data.magic())
print("Signing key name:", attested_data.signing_key_name())
print("Nonce:", attested_data.nonce())
print("Firmware version:", attested_data.firmware_version())
print("Hash ID (for PCR values, SHA1 probably):", attested_data.pcr_hash_id())
print("Digest of selected PCRs:", attested_data.digest())

print("\nSelected PCRs:")
sep = ""
for i, val in enumerate(attested_data.pcr_select()):
print(f"{sep}{i:2}: {str(val):5}", end="")
sep = " | "
if i % 8 == 7:
sep = ""
print()


if __name__ == "__main__":
tpm2_demo()
238 changes: 238 additions & 0 deletions tpm2_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
from TPM2.Tpm import *
from TPM2.Crypt import Crypto
from typing import List

import cryptography.hazmat.primitives.asymmetric.ec as ec
from cryptography.hazmat.primitives.asymmetric.utils import encode_dss_signature
from cryptography.hazmat.primitives import hashes
from cryptography.exceptions import InvalidSignature


AIK_TEMPLATE = TPMT_PUBLIC(
TPM_ALG_ID.SHA256,
TPMA_OBJECT.restricted
| TPMA_OBJECT.sign
| TPMA_OBJECT.sensitiveDataOrigin
| TPMA_OBJECT.fixedTPM
| TPMA_OBJECT.fixedParent
| TPMA_OBJECT.userWithAuth,
None,
TPMS_ECC_PARMS(
TPMT_SYM_DEF_OBJECT(), TPMS_SIG_SCHEME_ECDSA(TPM_ALG_ID.SHA256), TPM_ECC_CURVE.NIST_P256, TPMS_NULL_KDF_SCHEME()
),
TPMS_ECC_POINT(),
)

AIK_HANDLE = TPM_HANDLE(0x81000000)
"""Currently not used."""


class AttestData:
"""A wrapper class for attested data."""

def __init__(self, data: bytearray):
attest = TPMS_ATTEST.fromBytes(data)

# ignores attributes: type, clockInfo
self.__magic = attest.magic
self.__signing_key_name = attest.qualifiedSigner
self.__nonce = attest.extraData
self.__firmware_version = attest.firmwareVersion
self.__pcr_hash_id = attest.attested.pcrSelect[0].hash
self.__pcr_select = attest.attested.pcrSelect
self.__digest = attest.attested.pcrDigest

def magic(self) -> int:
return self.__magic

def signing_key_name(self) -> str:
return self.__signing_key_name.hex()

def nonce(self) -> str:
return self.__nonce.hex()

def firmware_version(self) -> int:
return self.__firmware_version

def pcr_hash_id(self) -> int:
return int(self.__pcr_hash_id)

def pcr_select(self) -> List[bool]:
bool_arr = 24 * [False]
for select in self.__pcr_select:
for j, byte in enumerate(select.pcrSelect):
for i in range(8):
bool_arr[8 * j + i] |= bool((byte >> i) & 1)
return bool_arr

def digest(self) -> str:
return self.__digest.hex()


def create_aik(tpm: Tpm) -> bool:
"""Create a persistent primary attestation identity key (AIK)."""
try:
key_res = tpm.CreatePrimary(TPM_HANDLE(TPM_RH.OWNER), TPMS_SENSITIVE_CREATE(), AIK_TEMPLATE, None, None)
except TpmError as tpm_e:
print(tpm_e)
return False

try:
tpm.EvictControl(TPM_HANDLE(TPM_RH.OWNER), key_res.handle, AIK_HANDLE)
except TpmError as tpm_e:
tpm.FlushContext(key_res.handle)
print(tpm_e)
return False

tpm.FlushContext(key_res.handle)
return True


def flush_aik(tpm: Tpm) -> bool:
"""Release the persistent primary attestation identity key (AIK)."""
try:
tpm.EvictControl(TPM_HANDLE(TPM_RH.OWNER), AIK_HANDLE, AIK_HANDLE)
except TpmError as tpm_e:
print(tpm_e)
return False

return True


def ecdsa_validate(x: bytearray, y: bytearray, r: bytearray, s: bytearray, data: bytearray) -> bool:
"""Validates ECDSA (r, s) signature for NIST-P256 (SECP256R1) with public point (x, y)."""
pub_nums = ec.EllipticCurvePublicNumbers(
curve=ec.SECP256R1(), x=int.from_bytes(x, "big"), y=int.from_bytes(y, "big")
)
pub_key = pub_nums.public_key()

try:
pub_key.verify(
encode_dss_signature(r=int.from_bytes(r, "big"), s=int.from_bytes(s, "big")),
data,
ec.ECDSA(hashes.SHA256()),
)
except InvalidSignature as sig_e:
print(sig_e)
return False

return True


def tpm_self_verify_signature(tpm: Tpm, aik: TPM_HANDLE, sig: TPMU_SIGNATURE, data: bytearray) -> bool:
"""Verify the signature after Quote with loaded AIK (Attestation Identity Key)."""
hs = Crypto.tpmAlgToPy(TPM_ALG_ID.SHA256)()
hs.update(data)
try:
tpm.VerifySignature(aik, hs.digest(), sig)
except TpmError as tpm_e:
print(tpm_e)
return False
return True


def get_random_bytes(tpm: Tpm, n: int) -> List[int]:
"""Get 'n' random bytes."""
rnd_bytes = tpm.GetRandom(n)
return list(rnd_bytes)


def get_property_year(tpm: Tpm) -> int:
"""Get year of the TPM."""
cap_res = tpm.GetCapability(TPM_CAP.TPM_PROPERTIES, TPM_PT.YEAR, 1)
year = cap_res.capabilityData.tpmProperty[0].value
return year


def get_property_vendor_string(tpm: Tpm) -> str:
"""Get vendor string 1 from the TPM."""
cap_res = tpm.GetCapability(TPM_CAP.TPM_PROPERTIES, TPM_PT.VENDOR_STRING_1, 1)
string = cap_res.capabilityData.tpmProperty[0].value

res_bytes = string.to_bytes(int(string.bit_length() / 8 + 0.5), "big")
return "".join([chr(i) for i in res_bytes])


def get_pcr_min_select(tpm: Tpm) -> int:
"""Get the minimum of bytes for PCR Select bitmap."""
cap_res = tpm.GetCapability(TPM_CAP.TPM_PROPERTIES, TPM_PT.PCR_SELECT_MIN, 1)
return cap_res.capabilityData.tpmProperty[0].value


def helper_get_pcr_select_list(pcr_list: List[int], alg: TPM_ALG_ID) -> TPMS_PCR_SELECTION:
"""PCR selection bitmap."""
if pcr_list is None:
select_list = [255, 0, 0]
else:
select_list = pcr_list[:3]
pcr_len = len(select_list)
select_list.extend([0 for _ in range(3 - pcr_len)])

return TPMS_PCR_SELECTION(alg, bytes(select_list))


def get_pcr_values(tpm: Tpm, pcr_list: List[int] = None, alg: TPM_ALG_ID = TPM_ALG_ID.SHA1) -> List[bytearray]:
"""Get PCR SHA1 values as bytearrays.
pcr_list is a bitmap for PCR selection (7|6|5|4|3|2|1|0)(15|14|13|12|11|10|9|8)(23|22|21|20|19|18|17|16).
Default hash algorithm is SHA1. Other hash algorithms might not be supported.
Note this is prone to MitM attack."""
pcr_select = []
for i, byte in enumerate(pcr_list):
pcr_select_byte = helper_get_pcr_select_list(i * [0] + [byte], alg)
try:
pcr_res = tpm.PCR_Read([pcr_select_byte])
except TpmError as tpm_e:
print(tpm_e)
return []

pcr_select += [pcr_val.buffer for pcr_val in pcr_res.pcrValues]

return pcr_select


def get_signed_pcr_values(
tpm: Tpm, nonce: bytearray, pcr_list: List[int] = None, alg: TPM_ALG_ID = TPM_ALG_ID.SHA1
) -> (bytearray, (bytearray, bytearray), (bytearray, bytearray)) or None:
"""Get attested PCR values with signature over 'data'.
nonce should be a random number.
pcr_list is a bitmap for PCR selection (7|6|5|4|3|2|1|0)(15|14|13|12|11|10|9|8)(23|22|21|20|19|18|17|16).
Returns 'data, pub_key=(x, y), sig=(r, s)' as bytearrays or None if signature is invalid.
Last 32 bytes (SHA256) of 'data' is PCR digest.
"""
pcr_select = helper_get_pcr_select_list(pcr_list, alg)

try:
key_res = tpm.CreatePrimary(TPM_HANDLE(TPM_RH.OWNER), TPMS_SENSITIVE_CREATE(), AIK_TEMPLATE, None, None)
except TpmError as tpm_e:
print(tpm_e)
return

aik = key_res.handle # attestation identity key
try:
quote_res = tpm.Quote(aik, nonce, TPMS_SIG_SCHEME_ECDSA(TPM_ALG_ID.SHA256), [pcr_select])
except TpmError as tpm_e:
tpm.FlushContext(aik)
print(tpm_e)
return

# get quoted data as bytearray
buf = TpmBuffer()
quote_res.quoted.toTpm(buf)
buf.trim()

ec_x = key_res.outPublic.unique.x
ec_y = key_res.outPublic.unique.y
sig_r = quote_res.signature.signatureR
sig_s = quote_res.signature.signatureS
data = buf.buffer

# self-test
if not tpm_self_verify_signature(tpm, aik, quote_res.signature, data):
tpm.FlushContext(aik)
return

# unload AIK
tpm.FlushContext(aik)

# data, public key, signature
return data, (ec_x, ec_y), (sig_r, sig_s)

0 comments on commit 56e3a8d

Please sign in to comment.