import warnings
from functools import wraps
from itertools import chain
from flask import current_app, request
from werkzeug.datastructures import ImmutableDict
from werkzeug.exceptions import Forbidden
from werkzeug.local import LocalProxy
from .additional import Additional, AdditionalManager
from .overrides import Override, OverrideManager
__all__ = ("Allows", "allows")
[docs]class Allows(object):
"""
The Flask-Allows extension object used to control defaults and drive
behavior.
:param app: Optional. Flask application instance.
:param identity_loader: Optional. Callable that will load the current user
:param throws: Optional. Exception type to raise by default when
authorization fails.
:param on_fail: Optional. A value to return or function to call when
authorization fails.
"""
def __init__(self, app=None, identity_loader=None, throws=Forbidden, on_fail=None):
self._identity_loader = identity_loader
self.throws = throws
self.on_fail = _make_callable(on_fail)
self.overrides = OverrideManager()
self.additional = AdditionalManager()
if app:
self.init_app(app)
[docs] def init_app(self, app):
"""
Initializes the Flask-Allows object against the provided application
"""
if not hasattr(app, "extensions"): # pragma: no cover
app.extensions = {}
app.extensions["allows"] = self
@app.before_request
def start_context(*a, **k):
self.overrides.push(Override())
self.additional.push(Additional())
@app.after_request
def cleanup(response):
self.clear_all_overrides()
self.clear_all_additional()
return response
[docs] def requires(self, *requirements, **opts):
"""
Decorator to enforce requirements on routes
:param requirements: Collection of requirements to impose on view
:param throws: Optional, keyword only. Exception to throw for this
route, if provided it takes precedence over the exception stored
on the instance
:param on_fail: Optional, keyword only. Value or function to use as
the on_fail for this route, takes precedence over the on_fail
configured on the instance.
"""
identity = opts.get("identity")
on_fail = opts.get("on_fail")
throws = opts.get("throws")
def decorator(f):
@wraps(f)
def allower(*args, **kwargs):
result = self.run(
requirements,
identity=identity,
on_fail=on_fail,
throws=throws,
f_args=args,
f_kwargs=kwargs,
)
# authorization failed
if result is not None:
return result
return f(*args, **kwargs)
return allower
return decorator
[docs] def identity_loader(self, f):
"""
Used to provide an identity loader after initialization of the
extension.
Can be used as a method::
allows.identity_loader(lambda: a_user)
Or as a decorator::
@allows.identity_loader
def load_user():
return a_user
If an identity loader is provided at initialization, this method
will overwrite it.
:param f: Callable to load the current user
"""
self._identity_loader = f
return f
[docs] def fulfill(self, requirements, identity=None):
"""
Checks that the provided or current identity meets each requirement
passed to this method.
This method takes into account both additional and overridden
requirements, with overridden requirements taking precedence::
allows.additional.push(Additional(Has('foo')))
allows.overrides.push(Override(Has('foo')))
allows.fulfill([], user_without_foo) # return True
:param requirements: The requirements to check the identity against.
:param identity: Optional. Identity to use in place of the current
identity.
"""
identity = identity or self._identity_loader()
if self.additional.current:
all_requirements = chain(iter(self.additional.current), requirements)
else:
all_requirements = iter(requirements)
if self.overrides.current is not None:
all_requirements = (
r for r in all_requirements if r not in self.overrides.current
)
return all(_call_requirement(r, identity, request) for r in all_requirements)
[docs] def clear_all_overrides(self):
"""
Helper method to remove all override contexts, this is called automatically
during the after request phase in Flask. However it is provided here
if override contexts need to be cleared independent of the application
context.
If an override context is found that originated from an OverrideManager
instance not controlled by the Allows object, a ``RuntimeError``
will be raised.
"""
while self.overrides.current is not None:
self.overrides.pop()
[docs] def clear_all_additional(self):
"""
Helper method to remove all additional contexts, this is called
automatically during the after request phase in Flask. However it is
provided here if additional contexts need to be cleared independent of
the request cycle.
If an additional context is found that originated from an
AdditionalManager instance not controlled by the Allows object, a
``RuntimeError`` will be raised.
"""
while self.additional.current is not None:
self.additional.pop()
[docs] def run(
self,
requirements,
identity=None,
throws=None,
on_fail=None,
f_args=(),
f_kwargs=ImmutableDict(), # noqa: B008
use_on_fail_return=True,
):
"""
Used to preform a full run of the requirements and the options given,
this method will invoke on_fail and/or throw the appropriate exception
type. Can be passed arguments to call on_fail with via f_args (which are
passed positionally) and f_kwargs (which are passed as keyword).
:param requirements: The requirements to check
:param identity: Optional. A specific identity to use for the check
:param throws: Optional. A specific exception to throw for this check
:param on_fail: Optional. A callback to invoke after failure,
alternatively a value to return when failure happens
:param f_args: Positional arguments to pass to the on_fail callback
:param f_kwargs: Keyword arguments to pass to the on_fail callback
:param use_on_fail_return: Boolean (default True) flag to determine
if the return value should be used. If true, the return value
will be considered, else failure will always progress to
exception raising.
"""
throws = throws or self.throws
on_fail = _make_callable(on_fail) if on_fail is not None else self.on_fail
if not self.fulfill(requirements, identity):
result = on_fail(*f_args, **f_kwargs)
if use_on_fail_return and result is not None:
return result
raise throws
def __get_allows():
"Internal helper"
try:
return current_app.extensions["allows"]
except (AttributeError, KeyError):
raise RuntimeError("Flask-Allows not configured against current app")
def _make_callable(func_or_value):
if not callable(func_or_value):
return lambda *a, **k: func_or_value
return func_or_value
def _call_requirement(req, user, request):
try:
return req(user)
except TypeError:
warnings.warn(
"{!r}: Passing request to requirements is now deprecated"
" and will be removed in 1.0".format(req),
DeprecationWarning,
stacklevel=2,
)
return req(user, request)
allows = LocalProxy(__get_allows, name="flask-allows")