1
- # Copyright (c) 2013-2024 by Ron Frederick <[email protected] > and others.
1
+ # Copyright (c) 2013-2025 by Ron Frederick <[email protected] > and others.
2
2
#
3
3
# This program and the accompanying materials are made available under
4
4
# the terms of the Eclipse Public License v2.0 which accompanies this
97
97
_KeyPairArg = Union ['SSHKeyPair' , _KeyArg , Tuple [_KeyArg , _CertArg ]]
98
98
KeyPairListArg = Union [_KeyPairArg , Sequence [_KeyPairArg ]]
99
99
100
+ _PassphraseCallable = Callable [[str ], BytesOrStr ]
101
+ _PassphraseArg = Optional [Union [_PassphraseCallable , BytesOrStr ]]
100
102
101
103
# Default file names in .ssh directory to read private keys from
102
104
_DEFAULT_KEY_FILES = (
@@ -192,6 +194,51 @@ def _wrap_base64(data: bytes, wrap: int = 64) -> bytes:
192
194
for i in range (0 , len (data ), wrap )) + b'\n '
193
195
194
196
197
+ def _resolve_passphrase (
198
+ passphrase : _PassphraseArg , filename : str ,
199
+ loop : Optional [asyncio .AbstractEventLoop ]) -> Optional [BytesOrStr ]:
200
+ """Resolve a passphrase used to encrypt/decrypt SSH private keys"""
201
+
202
+ resolved_passphrase : Optional [BytesOrStr ]
203
+
204
+ if callable (passphrase ):
205
+ resolved_passphrase = passphrase (filename )
206
+ else :
207
+ resolved_passphrase = passphrase
208
+
209
+ if loop and inspect .isawaitable (resolved_passphrase ):
210
+ resolved_passphrase = asyncio .run_coroutine_threadsafe (
211
+ resolved_passphrase , loop ).result ()
212
+
213
+ return resolved_passphrase
214
+
215
+
216
+ class _EncryptedKey :
217
+ """Encrypted SSH private key, decrypted just prior to use"""
218
+
219
+ def __init__ (self , key_data : bytes , filename : str ,
220
+ passphrase : _PassphraseArg ,
221
+ loop : Optional [asyncio .AbstractEventLoop ],
222
+ unsafe_skip_rsa_key_validation : bool ):
223
+ self ._key_data = key_data
224
+ self ._filename = filename
225
+ self ._passphrase = passphrase
226
+ self ._loop = loop
227
+ self ._unsafe_skip_rsa_key_validation = unsafe_skip_rsa_key_validation
228
+
229
+ def decrypt (self ) -> 'SSHKey' :
230
+ """Decrypt this encrypted key data and return an SSH private key"""
231
+
232
+ resolved_passphrase = _resolve_passphrase (self ._passphrase ,
233
+ self ._filename , self ._loop )
234
+
235
+ key = import_private_key (self ._key_data , resolved_passphrase ,
236
+ self ._unsafe_skip_rsa_key_validation )
237
+ key .set_filename (self ._filename )
238
+
239
+ return key
240
+
241
+
195
242
class KeyGenerationError (ValueError ):
196
243
"""Key generation error
197
244
@@ -2238,8 +2285,9 @@ class SSHLocalKeyPair(SSHKeyPair):
2238
2285
2239
2286
_key_type = 'local'
2240
2287
2241
- def __init__ (self , key : SSHKey , pubkey : Optional [SSHKey ] = None ,
2242
- cert : Optional [SSHCertificate ] = None ):
2288
+ def __init__ (self , key : SSHKey , pubkey : Optional [SSHKey ],
2289
+ cert : Optional [SSHCertificate ],
2290
+ enc_key : Optional [_EncryptedKey ]):
2243
2291
if pubkey and pubkey .public_data != key .public_data :
2244
2292
raise ValueError ('Public key mismatch' )
2245
2293
@@ -2254,10 +2302,11 @@ def __init__(self, key: SSHKey, pubkey: Optional[SSHKey] = None,
2254
2302
2255
2303
super ().__init__ (key .algorithm , key .algorithm , key .sig_algorithms ,
2256
2304
key .sig_algorithms , key .public_data , comment ,
2257
- cert , key .get_filename (), key .use_executor ,
2258
- key .use_webauthn )
2305
+ cert , key .get_filename (), key .use_executor or
2306
+ bool ( enc_key ), key .use_webauthn )
2259
2307
2260
2308
self ._key = key
2309
+ self ._enc_key = enc_key
2261
2310
2262
2311
def get_agent_private_key (self ) -> bytes :
2263
2312
"""Return binary encoding of keypair for upload to SSH agent"""
@@ -2273,6 +2322,12 @@ def get_agent_private_key(self) -> bytes:
2273
2322
def sign (self , data : bytes ) -> bytes :
2274
2323
"""Sign a block of data with this private key"""
2275
2324
2325
+ if self ._enc_key :
2326
+ self ._key = self ._enc_key .decrypt ()
2327
+ self ._enc_key = None
2328
+
2329
+ self .use_executor = self ._key .use_executor
2330
+
2276
2331
return self ._key .sign (data , self .sig_algorithm )
2277
2332
2278
2333
@@ -2368,7 +2423,7 @@ def _match_block(data: bytes, start: int, header: bytes,
2368
2423
"""Match a block of data wrapped in a header/footer"""
2369
2424
2370
2425
match = re .compile (b'^' + header [:5 ] + b'END' + header [10 :] +
2371
- rb'[ \t\r\f\v]*$' , re .M ).search (data , start )
2426
+ rb'[ \t\n\ r\f\v]*$' , re .M ).search (data , start )
2372
2427
2373
2428
if not match :
2374
2429
raise KeyImportError (f'Missing { fmt } footer' )
@@ -3203,21 +3258,6 @@ def import_private_key(
3203
3258
raise KeyImportError ('Invalid private key' )
3204
3259
3205
3260
3206
- def import_private_key_and_certs (
3207
- data : bytes , passphrase : Optional [BytesOrStr ] = None ,
3208
- unsafe_skip_rsa_key_validation : Optional [bool ] = None ) -> \
3209
- Tuple [SSHKey , Optional [SSHX509CertificateChain ]]:
3210
- """Import a private key and optional certificate chain"""
3211
-
3212
- key , end = _decode_private (data , passphrase ,
3213
- unsafe_skip_rsa_key_validation )
3214
-
3215
- if key :
3216
- return key , import_certificate_chain (data [end :])
3217
- else :
3218
- raise KeyImportError ('Invalid private key' )
3219
-
3220
-
3221
3261
def import_public_key (data : BytesOrStr ) -> SSHKey :
3222
3262
"""Import a public key
3223
3263
@@ -3339,20 +3379,6 @@ def read_private_key(
3339
3379
return key
3340
3380
3341
3381
3342
- def read_private_key_and_certs (
3343
- filename : FilePath , passphrase : Optional [BytesOrStr ] = None ,
3344
- unsafe_skip_rsa_key_validation : Optional [bool ] = None ) -> \
3345
- Tuple [SSHKey , Optional [SSHX509CertificateChain ]]:
3346
- """Read a private key and optional certificate chain from a file"""
3347
-
3348
- key , cert = import_private_key_and_certs (read_file (filename ), passphrase ,
3349
- unsafe_skip_rsa_key_validation )
3350
-
3351
- key .set_filename (filename )
3352
-
3353
- return key , cert
3354
-
3355
-
3356
3382
def read_public_key (filename : FilePath ) -> SSHKey :
3357
3383
"""Read a public key from a file
3358
3384
@@ -3512,31 +3538,37 @@ def load_keypairs(
3512
3538
"""
3513
3539
3514
3540
keys_to_load : Sequence [_KeyPairArg ]
3541
+ key_data : Optional [bytes ]
3542
+ key : Union ['SSHKey' , 'SSHKeyPair' ]
3515
3543
result : List [SSHKeyPair ] = []
3516
3544
3517
3545
certlist = load_certificates (certlist )
3518
3546
certdict = {cert .key .public_data : cert for cert in certlist }
3519
3547
3520
3548
if isinstance (keylist , (PurePath , str )):
3521
- try :
3522
- if callable (passphrase ):
3523
- resolved_passphrase = passphrase (str (keylist ))
3524
- else :
3525
- resolved_passphrase = passphrase
3549
+ data = read_file (keylist )
3550
+ key_data_list : List [bytes ] = []
3526
3551
3527
- if loop and inspect .isawaitable (resolved_passphrase ):
3528
- resolved_passphrase = asyncio .run_coroutine_threadsafe (
3529
- resolved_passphrase , loop ).result ()
3552
+ while data :
3553
+ fmt , _ , end = _match_next (data , b'PRIVATE KEY' )
3554
+ if fmt :
3555
+ key_data_list .append (data [:end ])
3530
3556
3531
- priv_keys = read_private_key_list (keylist , resolved_passphrase ,
3532
- unsafe_skip_rsa_key_validation )
3557
+ data = data [end :]
3533
3558
3534
- if len (priv_keys ) <= 1 :
3535
- keys_to_load = [keylist ]
3536
- passphrase = resolved_passphrase
3537
- else :
3538
- keys_to_load = priv_keys
3539
- except KeyImportError :
3559
+ if len (key_data_list ) > 1 :
3560
+ resolved_passphrase = _resolve_passphrase (passphrase ,
3561
+ str (keylist ), loop )
3562
+
3563
+ keys_to_load = []
3564
+
3565
+ for key_data in key_data_list :
3566
+ key = import_private_key (key_data , resolved_passphrase ,
3567
+ unsafe_skip_rsa_key_validation )
3568
+ key .set_filename (keylist )
3569
+
3570
+ keys_to_load .append (key )
3571
+ else :
3540
3572
keys_to_load = [keylist ]
3541
3573
elif isinstance (keylist , (tuple , bytes , SSHKey , SSHKeyPair )):
3542
3574
keys_to_load = [cast (_KeyPairArg , keylist )]
@@ -3545,61 +3577,37 @@ def load_keypairs(
3545
3577
3546
3578
for key_to_load in keys_to_load :
3547
3579
allow_certs = False
3548
- key_prefix = None
3549
- saved_exc = None
3580
+ key_data = None
3581
+ key_prefix = ''
3550
3582
pubkey_or_certs = None
3551
- pubkey_to_load : Optional [_KeyArg ] = None
3552
3583
certs_to_load : Optional [_CertArg ] = None
3553
- key : Union ['SSHKey' , 'SSHKeyPair' ]
3584
+ pubkey_to_load : Optional [_KeyArg ] = None
3585
+ saved_exc = None
3586
+ enc_key : Optional [_EncryptedKey ] = None
3554
3587
3555
3588
if isinstance (key_to_load , (PurePath , str , bytes )):
3556
3589
allow_certs = True
3557
3590
elif isinstance (key_to_load , tuple ):
3558
3591
key_to_load , pubkey_or_certs = key_to_load
3559
3592
3560
- try :
3561
- if isinstance (key_to_load , (PurePath , str )):
3562
- key_prefix = str (key_to_load )
3593
+ if isinstance (key_to_load , (PurePath , str )):
3594
+ key_prefix = str (key_to_load )
3595
+ key_data = read_file (key_to_load )
3596
+ elif isinstance (key_to_load , bytes ):
3597
+ key_data = key_to_load
3563
3598
3564
- if callable (passphrase ):
3565
- resolved_passphrase = passphrase (key_prefix )
3566
- else :
3567
- resolved_passphrase = passphrase
3599
+ certs : Optional [Sequence [SSHCertificate ]]
3568
3600
3569
- if loop and inspect .isawaitable (resolved_passphrase ):
3570
- resolved_passphrase = asyncio .run_coroutine_threadsafe (
3571
- resolved_passphrase , loop ).result ()
3601
+ if allow_certs :
3602
+ assert key_data is not None
3572
3603
3573
- if allow_certs :
3574
- key , certs_to_load = read_private_key_and_certs (
3575
- key_to_load , resolved_passphrase ,
3576
- unsafe_skip_rsa_key_validation )
3604
+ _ , _ , end = _match_next (key_data , b'PRIVATE KEY' )
3577
3605
3578
- if not certs_to_load :
3579
- certs_to_load = key_prefix + '-cert.pub'
3580
- else :
3581
- key = read_private_key (key_to_load , resolved_passphrase ,
3582
- unsafe_skip_rsa_key_validation )
3583
-
3584
- pubkey_to_load = key_prefix + '.pub'
3585
- elif isinstance (key_to_load , bytes ):
3586
- if allow_certs :
3587
- key , certs_to_load = import_private_key_and_certs (
3588
- key_to_load , passphrase ,
3589
- unsafe_skip_rsa_key_validation )
3590
- else :
3591
- key = import_private_key (key_to_load , passphrase ,
3592
- unsafe_skip_rsa_key_validation )
3593
- else :
3594
- key = key_to_load
3595
- except KeyImportError as exc :
3596
- if skip_public or \
3597
- (ignore_encrypted and str (exc ).startswith ('Passphrase' )):
3598
- continue
3599
-
3600
- raise
3606
+ certs_to_load = import_certificate_chain (key_data [end :])
3607
+ key_data = key_data [:end ]
3601
3608
3602
- certs : Optional [Sequence [SSHCertificate ]]
3609
+ if not certs_to_load :
3610
+ certs_to_load = key_prefix + '-cert.pub'
3603
3611
3604
3612
if pubkey_or_certs :
3605
3613
try :
@@ -3613,7 +3621,7 @@ def load_keypairs(
3613
3621
elif certs_to_load :
3614
3622
try :
3615
3623
certs = load_certificates (certs_to_load )
3616
- except (OSError , KeyImportError ):
3624
+ except (OSError , KeyImportError ) as exc :
3617
3625
certs = None
3618
3626
else :
3619
3627
certs = None
@@ -3628,16 +3636,58 @@ def load_keypairs(
3628
3636
pubkey = import_public_key (pubkey_to_load )
3629
3637
else :
3630
3638
pubkey = pubkey_to_load
3639
+
3640
+ saved_exc = None
3631
3641
except (OSError , KeyImportError ):
3632
3642
pubkey = None
3633
- else :
3643
+ elif key_prefix :
3644
+ try :
3645
+ pubkey = read_public_key (key_prefix + '.pub' )
3634
3646
saved_exc = None
3647
+ except (OSError , KeyImportError ):
3648
+ try :
3649
+ pubkey = read_public_key (key_prefix )
3650
+ saved_exc = None
3651
+ except (OSError , KeyImportError ):
3652
+ pubkey = None
3635
3653
else :
3636
3654
pubkey = None
3637
3655
3638
3656
if saved_exc :
3639
3657
raise saved_exc # pylint: disable=raising-bad-type
3640
3658
3659
+ if key_data is not None :
3660
+ try :
3661
+ unencrypted_key = import_private_key (
3662
+ key_data , None , unsafe_skip_rsa_key_validation )
3663
+ unencrypted_key .set_filename (key_prefix )
3664
+ except KeyImportError :
3665
+ unencrypted_key = None
3666
+
3667
+ if unencrypted_key :
3668
+ key = unencrypted_key
3669
+ elif callable (passphrase ) and key_prefix and (certs or pubkey ):
3670
+ enc_key = _EncryptedKey (key_data , key_prefix , passphrase , loop ,
3671
+ unsafe_skip_rsa_key_validation )
3672
+
3673
+ key = certs [0 ].key if certs else pubkey
3674
+ else :
3675
+ try :
3676
+ resolved_passphrase = _resolve_passphrase (passphrase ,
3677
+ key_prefix , loop )
3678
+
3679
+ key = import_private_key (key_data , passphrase ,
3680
+ unsafe_skip_rsa_key_validation )
3681
+ key .set_filename (key_prefix )
3682
+ except KeyImportError as exc :
3683
+ if skip_public or (ignore_encrypted and
3684
+ str (exc ).startswith ('Passphrase' )):
3685
+ continue
3686
+
3687
+ raise
3688
+ else :
3689
+ key = cast (Union [SSHKey , SSHKeyPair ], key_to_load )
3690
+
3641
3691
if not certs :
3642
3692
if isinstance (key , SSHKeyPair ):
3643
3693
pubdata = key .key_public_data
@@ -3660,9 +3710,9 @@ def load_keypairs(
3660
3710
result .append (key )
3661
3711
else :
3662
3712
if cert :
3663
- result .append (SSHLocalKeyPair (key , pubkey , cert ))
3713
+ result .append (SSHLocalKeyPair (key , pubkey , cert , enc_key ))
3664
3714
3665
- result .append (SSHLocalKeyPair (key , pubkey ))
3715
+ result .append (SSHLocalKeyPair (key , pubkey , None , enc_key ))
3666
3716
3667
3717
return result
3668
3718
0 commit comments