Skip to content

Improve Keycloak handling #111

@StefanSchuhart

Description

@StefanSchuhart

Currently, authorisation token verification is performed online, with a connection to Keycloak being made for every request. This is in contrast to the more common offline verification method. Additionally, the code contains helpers/utilities to connect to keycloak api via admin credentials which is not really needed during production.

I suggest to:

  • delete unnecessary keycloak admin code and remove the admin credentials from UMP code (debugging keycloak settings could be done during development)
  • cache public keys and validate tokens offline
  • remove client ID dependency and switch to resource server approach (multiple clients can authenticate against UMP, supporting different client types (Admin client, web app client) or deplyoment stages (dev, test, QS, prod, ...)

For example:

@app.before_request
def check_jwt():

    auth = request.authorization
    
    if auth is not None:
        try:
            # Validate token without client_id dependency
            decoded = validate_token_multi_client(auth.token)
            g.auth_token = decoded
        except Exception as e:
            app.logger.error(f"Token validation failed: {e}")
            raise CustomException(
                message="Invalid or expired token",
                status_code=401,
            )
    else:
        g.auth_token = None

def validate_token_multi_client(token):
    """Validate token from any client in the realm"""
    try:
        # Get public keys (implement caching in production)
        jwks = get_keycloak_public_keys()
        
        # Decode header to get key ID
        header = jwt.get_unverified_header(token)
        key_id = header['kid']
        
        # Find the right public key
        public_key = None
        for key in jwks['keys']:
            if key['kid'] == key_id:
                public_key = RSAAlgorithm.from_jwk(key)
                break
                
        if not public_key:
            raise Exception("Public key not found")
            
        # Validate token
        decoded = jwt.decode(
            token,
            public_key,
            algorithms=['RS256'],
            audience=config.UMP_API_AUDIENCE,  # Your API identifier
            issuer=f"{config.UMP_KEYCLOAK_URL}/realms/{config.UMP_KEYCLOAK_REALM}"
        )
        
        return decoded
        
    except jwt.ExpiredSignatureError:
        raise Exception("Token has expired")
    except jwt.InvalidTokenError as e:
        raise Exception(f"Invalid token: {e}")

# Cache the public keys (refresh periodically)
def get_keycloak_public_keys():
    jwks_url = f"{config.UMP_KEYCLOAK_URL}/realms/{config.UMP_KEYCLOAK_REALM}/protocol/openid-connect/certs"
    response = requests.get(jwks_url)
    return response.json()

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request
    No fields configured for Feature.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions