11import logging
22import shlex
3- import threading
3+ import subprocess
44import sys
5- import httpx
5+ import threading
6+ from typing import Optional , Protocol , Union
67
7- import subprocess
8- from typing import List , Optional , Union , Protocol
8+ import httpx
99
1010logger = logging .getLogger ("dvc" )
1111
1212
1313def _log_with_thread (level : int , msg : str , * args ) -> None :
1414 """
1515 Universal helper to inject thread identity into logs.
16- Output format: [Thread-Name] [BearerAuthClient] Message...
16+ Output format: [Thread-Name] Message...
1717 """
1818 if logger .isEnabledFor (level ):
1919 thread_name = threading .current_thread ().name
20- logger .log (level , f"[{ thread_name } ] { msg } " , * args )
20+ log_fmt = f"[{ thread_name } ] " + msg
21+ logger .log (level , log_fmt , * args )
2122
2223
23- def execute_command (command : Union [List [str ], str ], timeout : int = 10 ) -> str :
24+ def execute_command (command : Union [list [str ], str ], timeout : int = 10 ) -> str :
2425 """Executes a command to retrieve the token."""
2526 if isinstance (command , str ):
2627 command = shlex .split (command )
2728
2829 try :
29- result = subprocess .run (
30+ result = subprocess .run ( # noqa: S603
3031 command ,
3132 shell = False ,
3233 capture_output = True ,
@@ -35,17 +36,20 @@ def execute_command(command: Union[List[str], str], timeout: int = 10) -> str:
3536 timeout = timeout ,
3637 encoding = "utf-8" ,
3738 )
38- token = result .stdout .strip ()
39- if not token :
40- raise ValueError ("Command executed successfully but returned an empty token." )
41- return token
42-
43- except (FileNotFoundError , subprocess .TimeoutExpired , subprocess .CalledProcessError , ValueError , Exception ) as e :
39+ except (
40+ FileNotFoundError ,
41+ subprocess .TimeoutExpired ,
42+ subprocess .CalledProcessError ,
43+ ValueError ,
44+ OSError ,
45+ ) as e :
4446 error_header = "\n " + "=" * 60
45- error_msg = f"{ error_header } \n [CRITICAL] Bearer Token Retrieval Failed.\n " \
46- f"DVC may misinterpret this as 'File Not Found' and skip files.\n " \
47- f"Command: { command } \n " \
48- f"Error: { e } "
47+ error_msg = (
48+ f"{ error_header } \n [CRITICAL] Bearer Token Retrieval Failed.\n "
49+ "DVC may misinterpret this as 'File Not Found' and skip files.\n "
50+ f"Command: { command } \n "
51+ f"Error: { e } "
52+ )
4953
5054 if isinstance (e , subprocess .CalledProcessError ):
5155 error_msg += f"\n Stderr: { e .stderr .strip ()} "
@@ -58,7 +62,12 @@ def execute_command(command: Union[List[str], str], timeout: int = 10) -> str:
5862
5963 # Re-raise the exception so the caller knows it failed.
6064 # DVC might catch this and swallow it, but we've done our duty to notify.
61- raise e
65+ raise
66+
67+ token = result .stdout .strip ()
68+ if not token :
69+ raise ValueError ("Command executed successfully but returned an empty token." )
70+ return token
6271
6372
6473class TokenSaver (Protocol ):
@@ -67,15 +76,22 @@ class TokenSaver(Protocol):
6776 def __call__ (self , token : Optional [str ]) -> None : ...
6877
6978
70- def safe_callback (cb : Optional [TokenSaver ], value : Optional [str ], operation : str ) -> None :
79+ def safe_callback (
80+ cb : Optional [TokenSaver ], value : Optional [str ], operation : str
81+ ) -> None :
7182 """Safely execute callback function with error handling"""
7283 if not cb :
7384 return
7485
7586 try :
7687 cb (value )
77- except Exception as e :
78- _log_with_thread (logging .WARNING , "[BearerAuthClient] Failed to %s token: %s" , operation , e )
88+ except Exception as e : # noqa: BLE001
89+ _log_with_thread (
90+ logging .WARNING ,
91+ "[BearerAuthClient] Failed to %s token: %s" ,
92+ operation ,
93+ e ,
94+ )
7995
8096
8197class BearerAuthClient (httpx .Client ):
@@ -89,34 +105,42 @@ class BearerAuthClient(httpx.Client):
89105 """
90106
91107 def __init__ (
92- self ,
93- bearer_token_command : str ,
94- save_token_cb : Optional [TokenSaver ] = None ,
95- ** kwargs ,
108+ self ,
109+ bearer_token_command : str ,
110+ save_token_cb : Optional [TokenSaver ] = None ,
111+ ** kwargs ,
96112 ):
97113 super ().__init__ (** kwargs )
98- if not isinstance (bearer_token_command , str ) or not bearer_token_command .strip ():
99- raise ValueError ("[BearerAuthClient] bearer_token_command must be a non-empty string" )
114+ if (
115+ not isinstance (bearer_token_command , str )
116+ or not bearer_token_command .strip ()
117+ ):
118+ raise ValueError (
119+ "[BearerAuthClient] bearer_token_command must be a non-empty string"
120+ )
100121 self .bearer_token_command = bearer_token_command
101122 self .save_token_cb = save_token_cb
102123 self ._token : Optional [str ] = None
103124 self ._lock = threading .Lock ()
104125
105126 def _refresh_token_locked (self ) -> None :
106127 """Execute token command and update state."""
107- _log_with_thread (logging .DEBUG , "[BearerAuthClient] Refreshing token via command..." )
128+ _log_with_thread (
129+ logging .DEBUG , "[BearerAuthClient] Refreshing token via command..."
130+ )
108131
109132 try :
110133 new_token = execute_command (self .bearer_token_command )
111- if not new_token :
112- raise ValueError (f"Bearer token command { self .bearer_token_command } returned empty token" )
134+ # execute_command guarantees non-empty string or raises ValueError
113135
114136 self ._token = new_token
115137 self .headers ["Authorization" ] = f"Bearer { new_token } "
116138 safe_callback (self .save_token_cb , new_token , "save" )
117139
118- _log_with_thread (logging .DEBUG , "[BearerAuthClient] Token refreshed successfully." )
119- except :
140+ _log_with_thread (
141+ logging .DEBUG , "[BearerAuthClient] Token refreshed successfully."
142+ )
143+ except Exception :
120144 # Clean up state on failure
121145 self ._token = None
122146 # Clear persisted token but don't fail the refresh operation
@@ -150,7 +174,9 @@ def request(self, *args, **kwargs) -> httpx.Response:
150174 if response .status_code != 401 :
151175 return response
152176
153- _log_with_thread (logging .DEBUG , "[BearerAuthClient] Received 401. Attempting recovery." )
177+ _log_with_thread (
178+ logging .DEBUG , "[BearerAuthClient] Received 401. Attempting recovery."
179+ )
154180 sent_auth_header = response .request .headers .get ("Authorization" )
155181
156182 try :
@@ -159,10 +185,15 @@ def request(self, *args, **kwargs) -> httpx.Response:
159185 if sent_auth_header == current_auth_header :
160186 self ._refresh_token_locked ()
161187 else :
162- _log_with_thread (logging .DEBUG ,
163- "[BearerAuthClient] Token already refreshed by another thread. Retrying." )
164- except Exception as e :
165- logger .error (f"[BearerAuthClient] Recovery failed: Token refresh threw exception: { e } " )
188+ _log_with_thread (
189+ logging .DEBUG ,
190+ "[BearerAuthClient] Token already refreshed by another thread. "
191+ "Retrying." ,
192+ )
193+ except Exception :
194+ logger .exception (
195+ "[BearerAuthClient] Recovery failed: Token refresh threw exception"
196+ )
166197 return response
167198
168199 # Retry the request with the new valid token
0 commit comments