From 63f47c31bd361e2b02cf19fc55eed62d56f846b6 Mon Sep 17 00:00:00 2001 From: Kamil Kosek <3264948+kamilkosek@users.noreply.github.com> Date: Tue, 22 Oct 2024 11:59:43 +0200 Subject: [PATCH] Implement session-based login with OTP --- requirements.txt | 1 + src/cloudsigma/generic.py | 23 ++++++++++++++++------- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/requirements.txt b/requirements.txt index 062ffcb..757f4b6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,3 +8,4 @@ simplejson>=3.17.0 six>=1.14.0 websocket-client>=0.57.0 nose>=1.3.7 +pyotp diff --git a/src/cloudsigma/generic.py b/src/cloudsigma/generic.py index ecd7b25..7203e19 100644 --- a/src/cloudsigma/generic.py +++ b/src/cloudsigma/generic.py @@ -11,6 +11,8 @@ from builtins import object from builtins import str from future import standard_library +import pyotp + standard_library.install_aliases() @@ -80,6 +82,7 @@ def __init__( api_endpoint=None, username=None, password=None, + secret=None, login_method=LOGIN_METHOD_BASIC, request_log_level=None ): @@ -87,6 +90,7 @@ def __init__( else config['api_endpoint'] self.username = username if username else config['username'] self.password = password if password else config['password'] + self.secret = secret if secret else config['secret'] self.login_method = config.get('login_method', login_method) assert self.login_method in self.LOGIN_METHODS, \ 'Invalid value %r for login_method' % (login_method,) @@ -99,7 +103,7 @@ def __init__( if self.request_log_level: self.request_log_level = self.request_log_level.upper() - if login_method == self.LOGIN_METHOD_SESSION: + if self.login_method == self.LOGIN_METHOD_SESSION: self._login_session() def _login_session(self): @@ -107,12 +111,14 @@ def _login_session(self): self._session = requests.Session() full_url = self._get_full_url('/accounts/action/') kwargs = self._get_req_args(query_params={'do': 'login'}) - data = simplejson.dumps( - { - "username": self.username, - "password": self.password - } - ) + login_object = { + "username": self.username, + "password": self.password + } + if self.secret: + otp = pyotp.TOTP(self.secret).now() + login_object["otp"] = otp + data = simplejson.dumps(login_object) res = self.http.post(full_url, data=data, **kwargs) self._process_response(res) csrf_token = res.cookies['csrftoken'] @@ -207,6 +213,9 @@ def http(self): def get(self, url, query_params=None, return_list=False): kwargs = self._get_req_args(query_params=query_params) + if self.login_method == self.LOGIN_METHOD_SESSION and self.secret: + kwargs['headers'].update({'X-CSRFToken':self._session.headers['x-csrftoken']}) + kwargs['headers'].update({'OTP':pyotp.TOTP(self.secret).now()}) self.resp = self.http.get(self._get_full_url(url), **kwargs) return self._process_response(self.resp, return_list)