Skip to content

Move generate_access_token to test_common.test_utils #47712

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
13 changes: 2 additions & 11 deletions clients/python/test_python_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,11 @@
import sys
import time
import uuid
from typing import cast

import airflow_client.client
import pytest

from airflow.api_fastapi.app import create_app, get_auth_manager
from airflow.api_fastapi.auth.managers.simple.datamodels.login import LoginBody
from airflow.api_fastapi.auth.managers.simple.services.login import SimpleAuthManagerLogin
from airflow.api_fastapi.auth.managers.simple.simple_auth_manager import SimpleAuthManager
from tests_common.test_utils.api_client_helpers import generate_access_token

try:
# If you have rich installed, you will have nice colored output of the API responses
Expand Down Expand Up @@ -66,12 +62,7 @@
# Used to initialize FAB and the auth manager, necessary for creating the token.


create_app()
auth_manager = cast("get_auth_manager()", SimpleAuthManager)
users = auth_manager.get_users()
passwords = auth_manager.get_passwords(users)
username, password = next(iter(passwords.items()))
access_token = SimpleAuthManagerLogin.create_token(LoginBody(username=username, password=password))
access_token = generate_access_token("admin", "admin", "localhost:8080")
configuration = airflow_client.client.Configuration(
host="http://localhost:8080/api/v2",
)
Expand Down
51 changes: 51 additions & 0 deletions devel-common/src/tests_common/test_utils/api_client_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry


def generate_access_token(username: str, password: str, host: str) -> str:
"""
Generate valid access token for the given username and password.

Note: API server is currently using Simple Auth Manager.

:param username: The username to use for the login
:param password: The password to use for the login
:param host: The host to use for the login
:return: The access token
"""
Retry.DEFAULT_BACKOFF_MAX = 32
# retry for rate limit errors (429) and server errors (500, 502, 503, 504)
retry = Retry(total=10, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504])
# Backoff Retry Formula: min(1 × (2^(retry - 1)), 32) seconds
# 1 + 2 + 4 + 8 + 16 + 32 + 32 + 32 + 32 + 32 = 191 sec (~3.2 min)
session = requests.Session()
session.mount("http://", HTTPAdapter(max_retries=retry))
session.mount("https://", HTTPAdapter(max_retries=retry))
url = f"http://{host}/auth/token"
login_response = session.post(
url,
json={"username": username, "password": password},
)
access_token = login_response.json().get("access_token")

assert access_token, f"Failed to get JWT token from redirect url {url} with status code {login_response}"
return access_token
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
from docker_tests.command_utils import run_command
from docker_tests.constants import AIRFLOW_ROOT_PATH

from tests_common.test_utils.api_client_helpers import generate_access_token

# isort:on (needed to workaround isort bug)

DOCKER_COMPOSE_HOST_PORT = os.environ.get("HOST_PORT", "localhost:8080")
Expand All @@ -41,46 +43,16 @@
DAG_RUN_ID = "test_dag_run_id"


def get_jwt_token() -> str:
"""
Get the JWT token.

Note: API server is still using FAB Auth Manager.

Steps:
1. Get the login page to get the csrf token
- The csrf token is in the hidden input field with id "csrf_token"
2. Login with the username and password
- Must use the same session to keep the csrf token session
3. Extract the JWT token from the redirect url
- Expected to have a connection error
- The redirect url should have the JWT token as a query parameter

:return: The JWT token
"""
# get csrf token from login page
session = requests.Session()
url = f"http://{DOCKER_COMPOSE_HOST_PORT}/auth/token"
login_response = session.post(
url,
json={
"username": AIRFLOW_WWW_USER_USERNAME,
"password": AIRFLOW_WWW_USER_PASSWORD,
},
)
jwt_token = login_response.json().get("access_token")

assert jwt_token, f"Failed to get JWT token from redirect url {url} with status code {login_response}"
return jwt_token


def api_request(
method: str, path: str, base_url: str = f"http://{DOCKER_COMPOSE_HOST_PORT}/api/v2", **kwargs
) -> dict:
access_token = generate_access_token(
AIRFLOW_WWW_USER_USERNAME, AIRFLOW_WWW_USER_PASSWORD, DOCKER_COMPOSE_HOST_PORT
)
response = requests.request(
method=method,
url=f"{base_url}/{path}",
headers={"Authorization": f"Bearer {get_jwt_token()}", "Content-Type": "application/json"},
headers={"Authorization": f"Bearer {access_token}", "Content-Type": "application/json"},
**kwargs,
)
response.raise_for_status()
Expand Down
46 changes: 6 additions & 40 deletions kubernetes-tests/tests/kubernetes_tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
from urllib3.exceptions import MaxRetryError
from urllib3.util.retry import Retry

from tests_common.test_utils.api_client_helpers import generate_access_token

CLUSTER_FORWARDED_PORT = os.environ.get("CLUSTER_FORWARDED_PORT") or "8080"
KUBERNETES_HOST_PORT = (os.environ.get("CLUSTER_HOST") or "localhost") + ":" + CLUSTER_FORWARDED_PORT
EXECUTOR = os.environ.get("EXECUTOR")
Expand Down Expand Up @@ -127,45 +129,9 @@ def _delete_airflow_pod(name=""):
if names:
check_call(["kubectl", "delete", "pod", names[0]])

@staticmethod
def _get_jwt_token(username: str, password: str) -> str:
"""
Get the JWT token for the given username and password.

Note: API server is still using FAB Auth Manager.

Steps:
1. Login with the username and password
- Must use the same session to keep the csrf token session
2. Extract the JWT token from the auth/token url

:param session: The session to use for the request
:param username: The username to use for the login
:param password: The password to use for the login
:return: The JWT token
"""
# get csrf token from login page
Retry.DEFAULT_BACKOFF_MAX = 32
retry = Retry(total=10, backoff_factor=1)
# Backoff Retry Formula: min(1 × (2^(retry - 1)), 32) seconds
# 1 + 2 + 4 + 8 + 16 + 32 + 32 + 32 + 32 + 32 = 191 sec (~3.2 min)
session = requests.Session()
session.mount("http://", HTTPAdapter(max_retries=retry))
session.mount("https://", HTTPAdapter(max_retries=retry))
url = f"http://{KUBERNETES_HOST_PORT}/auth/token"
login_response = session.post(
url,
json={"username": username, "password": password},
)
jwt_token = login_response.json().get("access_token")

assert jwt_token, f"Failed to get JWT token from redirect url {url} with status code {login_response}"
return jwt_token

def _get_session_with_retries(self):
class JWTRefreshAdapter(HTTPAdapter):
def __init__(self, base_instance, **kwargs):
self.base_instance = base_instance
def __init__(self, **kwargs):
super().__init__(**kwargs)

def send(self, request, **kwargs):
Expand All @@ -176,7 +142,7 @@ def send(self, request, **kwargs):
jwt_token = None
while attempts < 5:
try:
jwt_token = self.base_instance._get_jwt_token("admin", "admin")
jwt_token = generate_access_token("admin", "admin", KUBERNETES_HOST_PORT)
break
except Exception:
attempts += 1
Expand All @@ -187,7 +153,7 @@ def send(self, request, **kwargs):
response = super().send(request, **kwargs)
return response

jwt_token = self._get_jwt_token("admin", "admin")
jwt_token = generate_access_token("admin", "admin", KUBERNETES_HOST_PORT)
session = requests.Session()
session.headers.update({"Authorization": f"Bearer {jwt_token}"})
retries = Retry(
Expand All @@ -196,7 +162,7 @@ def send(self, request, **kwargs):
status_forcelist=[404],
allowed_methods=Retry.DEFAULT_ALLOWED_METHODS | frozenset(["PATCH", "POST"]),
)
adapter = JWTRefreshAdapter(self, max_retries=retries)
adapter = JWTRefreshAdapter(max_retries=retries)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
Expand Down