|
| 1 | +"""Signer implementation for project sigstore. |
| 2 | +
|
| 3 | +Example: |
| 4 | +```python |
| 5 | +from sigstore.oidc import Issuer |
| 6 | +
|
| 7 | +from securesystemslib.signer import SigstoreKey, SigstoreSigner |
| 8 | +
|
| 9 | +# Create public key |
| 10 | +identity = "[email protected]" # change, unless you know my password |
| 11 | +issuer = "https://github.com/login/oauth" |
| 12 | +public_key = SigstoreKey.from_dict( |
| 13 | + "abcdefg", |
| 14 | + { |
| 15 | + "keytype": "sigstore-oidc", |
| 16 | + "scheme": "Fulcio", |
| 17 | + "keyval": { |
| 18 | + "issuer": issuer, |
| 19 | + "identity": identity, |
| 20 | + }, |
| 21 | + }, |
| 22 | +) |
| 23 | +
|
| 24 | +# Create signer |
| 25 | +issuer = Issuer.production() |
| 26 | +token = issuer.identity_token() # requires sign in with GitHub in a browser |
| 27 | +signer = SigstoreSigner(token, public_key) |
| 28 | +
|
| 29 | +# Sign |
| 30 | +signature = signer.sign(b"data") |
| 31 | +
|
| 32 | +# Verify |
| 33 | +public_key.verify_signature(signature, b"data") |
| 34 | +
|
| 35 | +``` |
| 36 | +
|
| 37 | +""" |
| 38 | + |
| 39 | +import io |
| 40 | +import logging |
| 41 | +from typing import Any, Dict, Optional |
| 42 | + |
| 43 | +from securesystemslib.exceptions import ( |
| 44 | + UnsupportedLibraryError, |
| 45 | + UnverifiedSignatureError, |
| 46 | + VerificationError, |
| 47 | +) |
| 48 | +from securesystemslib.signer._signer import ( |
| 49 | + Key, |
| 50 | + SecretsHandler, |
| 51 | + Signature, |
| 52 | + Signer, |
| 53 | +) |
| 54 | + |
| 55 | +IMPORT_ERROR = "sigstore library required to use 'sigstore-oidc' keys" |
| 56 | + |
| 57 | +logger = logging.getLogger(__name__) |
| 58 | + |
| 59 | + |
| 60 | +class SigstoreKey(Key): |
| 61 | + """Sigstore verifier. |
| 62 | +
|
| 63 | + NOTE: unstable API - routines and metadata formats may change! |
| 64 | + """ |
| 65 | + |
| 66 | + @classmethod |
| 67 | + def from_dict(cls, keyid: str, key_dict: Dict[str, Any]) -> "SigstoreKey": |
| 68 | + keytype = key_dict.pop("keytype") |
| 69 | + scheme = key_dict.pop("scheme") |
| 70 | + keyval = key_dict.pop("keyval") |
| 71 | + |
| 72 | + for content in ["identity", "issuer"]: |
| 73 | + if content not in keyval or not isinstance(keyval[content], str): |
| 74 | + raise ValueError( |
| 75 | + f"{content} string required for scheme {scheme}" |
| 76 | + ) |
| 77 | + |
| 78 | + return cls(keyid, keytype, scheme, keyval, key_dict) |
| 79 | + |
| 80 | + def to_dict(self) -> Dict: |
| 81 | + return { |
| 82 | + "keytype": self.keytype, |
| 83 | + "scheme": self.scheme, |
| 84 | + "keyval": self.keyval, |
| 85 | + **self.unrecognized_fields, |
| 86 | + } |
| 87 | + |
| 88 | + def verify_signature(self, signature: Signature, data: bytes) -> None: |
| 89 | + # pylint: disable=import-outside-toplevel,import-error |
| 90 | + result = None |
| 91 | + try: |
| 92 | + from sigstore.verify import VerificationMaterials, Verifier |
| 93 | + from sigstore.verify.policy import Identity |
| 94 | + from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import Bundle |
| 95 | + |
| 96 | + verifier = Verifier.production() |
| 97 | + identity = Identity( |
| 98 | + identity=self.keyval["identity"], issuer=self.keyval["issuer"] |
| 99 | + ) |
| 100 | + bundle = Bundle().from_dict(signature.unrecognized_fields["bundle"]) |
| 101 | + materials = VerificationMaterials.from_bundle( |
| 102 | + input_=io.BytesIO(data), bundle=bundle, offline=True |
| 103 | + ) |
| 104 | + result = verifier.verify(materials, identity) |
| 105 | + |
| 106 | + except Exception as e: |
| 107 | + logger.info("Key %s failed to verify sig: %s", self.keyid, str(e)) |
| 108 | + raise VerificationError( |
| 109 | + f"Unknown failure to verify signature by {self.keyid}" |
| 110 | + ) from e |
| 111 | + |
| 112 | + if not result: |
| 113 | + logger.info( |
| 114 | + "Key %s failed to verify sig: %s", |
| 115 | + self.keyid, |
| 116 | + getattr(result, "reason", ""), |
| 117 | + ) |
| 118 | + raise UnverifiedSignatureError( |
| 119 | + f"Failed to verify signature by {self.keyid}" |
| 120 | + ) |
| 121 | + |
| 122 | + |
| 123 | +class SigstoreSigner(Signer): |
| 124 | + """Sigstore signer. |
| 125 | +
|
| 126 | + NOTE: unstable API - routines and metadata formats may change! |
| 127 | + """ |
| 128 | + |
| 129 | + def __init__(self, token: str, public_key: Key): |
| 130 | + # TODO: Vet public key |
| 131 | + # - signer eligible for keytype/scheme? |
| 132 | + # - token matches identity/issuer? |
| 133 | + self.public_key = public_key |
| 134 | + self._token = token |
| 135 | + |
| 136 | + @classmethod |
| 137 | + def from_priv_key_uri( |
| 138 | + cls, |
| 139 | + priv_key_uri: str, |
| 140 | + public_key: Key, |
| 141 | + secrets_handler: Optional[SecretsHandler] = None, |
| 142 | + ) -> "SigstoreSigner": |
| 143 | + raise NotImplementedError() |
| 144 | + |
| 145 | + def sign(self, payload: bytes) -> Signature: |
| 146 | + """Signs payload using the OIDC token on the signer instance. |
| 147 | +
|
| 148 | + Arguments: |
| 149 | + payload: bytes to be signed. |
| 150 | +
|
| 151 | + Raises: |
| 152 | + Various errors from sigstore-python. |
| 153 | +
|
| 154 | + Returns: |
| 155 | + Signature. |
| 156 | +
|
| 157 | + NOTE: The relevant data is in `unrecognized_fields["bundle"]`. |
| 158 | +
|
| 159 | + """ |
| 160 | + # pylint: disable=import-outside-toplevel |
| 161 | + try: |
| 162 | + from sigstore.sign import Signer as _Signer |
| 163 | + except ImportError as e: |
| 164 | + raise UnsupportedLibraryError(IMPORT_ERROR) from e |
| 165 | + |
| 166 | + signer = _Signer.production() |
| 167 | + result = signer.sign(io.BytesIO(payload), self._token) |
| 168 | + # TODO: Ask upstream if they can make this public |
| 169 | + bundle = result._to_bundle() # pylint: disable=protected-access |
| 170 | + |
| 171 | + return Signature( |
| 172 | + self.public_key.keyid, |
| 173 | + bundle.message_signature.signature.hex(), |
| 174 | + {"bundle": bundle.to_dict()}, |
| 175 | + ) |
0 commit comments