Source code for flask_rbac_icdc.rbac

"""
This module provides role-based access control (RBAC) functionality for Flask applications.

The RBAC module validates the role of the subject based on
the role name provided in the authentication headers. The role name is validated against
the roles defined in the RBAC policy configuration.

Example:

.. code-block:: python

  rbac = RBAC(rbac_config_path, Account)

If it is necessary to implement more advanced conditional role assignment, you can override 
the `RbacAccount.get_role` method to achieve this.
"""

from abc import abstractmethod
from enum import Enum
from functools import wraps
import json
import os
from typing import Dict
import yaml
from flask import request, abort


[docs] class RbacAccount: """ Abstract base class that defines the interface for RBAC account objects. This class serves as a contract for implementing role-based access control (RBAC) account functionality. Any concrete implementation must provide properties for account identification and name, as well as a method to determine the subject's role based on authentication information. Note: All subclasses must implement the abstract methods and properties defined here. Example: .. code-block:: python class Account(db.Model, RbacAccount): __tablename__ = "accounts" object_name = "accounts" id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(64), unique=True, nullable=False) # ...Other account properties here @classmethod def get_by_name(cls, account_name: str) -> Optional["Account"]: return cls.query.filter_by(name=account_name).first() def get_role(self, requested_role: str) -> str: operator = is_operator(self.name, requested_role) if requested_role == "operator" and not operator: raise PermissionException("You are not operator") if operator: return "operator" return requested_role """ __abstract__ = True @property @abstractmethod def id(self) -> int: """The ID of the account.""" @property @abstractmethod def name(self) -> str: """The name of the account."""
[docs] @classmethod @abstractmethod def get_by_name(cls, account_name: str) -> "RbacAccount": """ Retrieve an account by its name. Args: account_name (str): The name of the account to retrieve. Returns: RbacAccount: Account instanse or None Example: .. code-block:: python @classmethod def get_by_name(cls, account_name: str) -> Optional["Account"]: return cls.query.filter_by(name=account_name).first() """ raise NotImplementedError("Subclasses must implement this method.")
[docs] @abstractmethod def get_role(self, requested_role: str) -> str: """ Determines the effective role of the account based on provided authentication information. Note: This is an abstract method that can be implemented by subclasses. This method can be used for more complex checks on a requested role or for conditional granting of another role for the subject. Args: requested_role (str): The role identifier provided in authentication headers. Returns: str: The granted role value to be used for this subject. Raises: PermissionException: This method should raise this error, if the provided role is invalid or not allowed for this account. """ return requested_role
[docs] class PermissionException(Exception): """ Raised when subject trying to perform an operation without the access rights """ def __init__(self, message): self.message = message super().__init__(message)
[docs] class Subject: """ Class represents a subject in the system, combining their account, role, and access permissions. It facilitates role-based access control (RBAC) by validating whether a user can perform a specific action and defining scope-based access restrictions. Attributes: account (RbacAccount): The account associated with this subject. account_id (int): The ID of the account. account_name (str): The name of the account. role (Enum): The role assigned to this subject. owner (str): The owner identifier for this subject. policy (dict): The policy configuration applied to this subject. """ def __init__(self, account: RbacAccount, role: Enum, owner, policy): """ Initialize a Subject with account, role, owner and permission details. Args: account (RbacAccount): The account associated with this subject. role (Enum): The role assigned to this subject. owner (str): The owner identifier for this subject. action (str): The action being performed, in format "object.permission". policy (dict): The policy configuration to apply. Raises: PermissionException: If the subject doesn't have permission for the requested action. """ self.account = account self.account_id = account.id self.account_name = account.name self.role = role self.owner = owner self.policy = policy
[docs] def filters(self, object_name: str): """ Get the filters that should be applied for this subject when accessing a specific object. Filters are used to restrict the scope of data access based on the subject's role and attributes. Args: object_name (str): The name of the object to get filters for. Returns: dict: A dictionary of filter key-value pairs to be applied when accessing the object. Example: .. code-block:: python # Example implementation in a SQLAlchemy base model class Base(db.Model): __abstract__ = True @property @abstractmethod def object_name(self): pass @classmethod def filtered(cls, subject: "Subject"): "Apply scope filters" return cls.query.filter_by(**subject.filters(cls.object_name)) """ return { key: getattr(self, value) for key, value in self.policy[object_name]["filters"].items() }
def __repr__(self): return ( f"Subject(\n" f" role={self.role},\n" f" account_id={self.account.id},\n" f" account_name={self.account.name},\n" f")" )
[docs] class RBAC: """ RBAC class to handle role-based access control. Attributes: policy (dict): The RBAC policy configuration loaded from the YAML file. roles (Enum): Enum of roles defined in the policy configuration. account_model (RbacAccount): The account model class to use for account operations. use_operator_group (bool): Flag to enable operator group functionality, is True by default. """ def __init__( self, config_path: str, account_model: RbacAccount, use_operator_group: bool = True, ): """ Initialize the RBAC instance. Args: config_path (str): Path to the YAML configuration file. """ self._roles = self.load_config(config_path) self._account_model = account_model self._use_operator_group = use_operator_group
[docs] def load_config(self, config_path): """ Load RBAC configuration from a YAML file. This function reads the role-based access control configuration from a YAML file and returns the parsed configuration as a dictionary. The configuration defines roles, their permissions for different resources, and any filters that should be applied when accessing those resources. Args: config_file (str): Path to the YAML configuration file. Returns: dict: Parsed RBAC configuration containing roles, permissions, and filters. Raises: FileNotFoundError: If the specified configuration file does not exist. yaml.YAMLError: If the configuration file contains invalid YAML syntax. """ if not os.path.exists(config_path): raise FileNotFoundError(f"RBAC config file not found: {config_path}") with open(config_path, encoding="utf-8") as file_handle: config: Dict[str, Dict[str, Dict]] = yaml.load( file_handle, Loader=yaml.FullLoader ) self._policy = config.get("roles", {}) roles = {role.upper(): role for role in self._policy.keys()} return Enum("Roles", roles)
def _check_permission(self, subject: Subject, action: str): """ Check if the subject has permission to perform the specified action. Args: subject (Subject): The subject requesting access. action (str): The action to check permissions for, in format "object.permission". Raises: PermissionException: If the subject doesn't have permission for the requested action. """ requested_object, requested_permission = action.rsplit(".", 1) # Check if object exists in policy if requested_object not in subject.policy: abort(403, f"Access to {action} forbidden for role {subject.role.name}") # Check if permission exists for the object permissions = subject.policy[requested_object]["permissions"] if requested_permission not in permissions: abort(403, f"Access to {action} forbidden for role {subject.role.name}")
[docs] def allow(self, action: str): """ A decorator to enforce role-based access control for an endpoint. This decorator validates that the requesting subject has the required permissions to access the endpoint by checking: 1. Account name from x-auth-account header 2. Role from x-auth-role header 3. Owner from x-auth-user header 4. Policy configuration for the current role from the RBAC configuration Args: action (str): The action to check permissions for, in format "object.permission" Returns: function: Decorated function that includes RBAC permission check Raises: Unauthorized 401: If account name or role headers are missing/invalid. Forbidden 403: If the subject does not have permission for the requested action. Example: .. code-block:: python @app.route('/users', methods=['GET']) @rbac.allow("users.read") def get_user(subject): # Function implementation pass """ def wrapper(func): @wraps(func) def wrap(*args, **kwargs): account_name = request.headers.get("x-auth-account") if not account_name: abort(401, "Account name is required in x-auth-account header.") account = self._account_model.get_by_name(account_name) if not account: abort(401, "Invalid auth parameters. Account name is not found.") owner = request.headers.get("x-auth-user") requested_role = request.headers.get("x-auth-role") if not requested_role: abort(401, "Role name is required in x-auth-role header.") try: granted_role = account.get_role(requested_role) # throws ValueError if role_name not in Roles enum subject_role = self._roles(granted_role) except ValueError: abort(401, "Invalid auth parameters. Role name is not found.") except PermissionException as e: abort(403, e.message) policy = self._policy.get(subject_role.value, {}) subject = Subject(account, subject_role, owner, policy) self._check_permission(subject, action) kwargs["subject"] = subject return func(*args, **kwargs) return wrap return wrapper
def __repr__(self): roles = json.dumps([role.name for role in self._roles]) policy = json.dumps(self._policy, indent=2) return f"RBAC(\n" f" roles={roles},\n" f" policy=\n{policy},\n" f")"