Source code for pyramid_authsanity.sources
from webob.cookies import JSONSerializer, SignedCookieProfile, SignedSerializer
from zope.interface import implementer
from .interfaces import IAuthSourceService
[docs]def SessionAuthSourceInitializer(value_key="sanity."):
""" An authentication source that uses the current session """
value_key = value_key + "value"
@implementer(IAuthSourceService)
class SessionAuthSource(object):
vary = []
def __init__(self, context, request):
self.request = request
self.session = request.session
self.cur_val = None
def get_value(self):
if self.cur_val is None:
self.cur_val = self.session.get(value_key, [None, None])
return self.cur_val
def headers_remember(self, value):
if self.cur_val is None:
self.cur_val = self.session.get(value_key, [None, None])
self.session[value_key] = value
return []
def headers_forget(self):
if self.cur_val is None:
self.cur_val = self.session.get(value_key, [None, None])
if value_key in self.session:
del self.session[value_key]
return []
return SessionAuthSource
[docs]def CookieAuthSourceInitializer(
secret,
cookie_name="auth",
secure=False,
max_age=None,
httponly=False,
path="/",
domains=None,
debug=False,
hashalg="sha512",
):
""" An authentication source that uses a unique cookie. """
@implementer(IAuthSourceService)
class CookieAuthSource(object):
vary = ["Cookie"]
def __init__(self, context, request):
self.domains = domains
if self.domains is None:
self.domains = []
self.domains.append(request.domain)
self.cookie = SignedCookieProfile(
secret,
"authsanity",
cookie_name,
secure=secure,
max_age=max_age,
httponly=httponly,
path=path,
domains=domains,
hashalg=hashalg,
)
# Bind the cookie to the current request
self.cookie = self.cookie.bind(request)
def get_value(self):
val = self.cookie.get_value()
if val is None:
return [None, None]
return val
def headers_remember(self, value):
return self.cookie.get_headers(value, domains=self.domains)
def headers_forget(self):
return self.cookie.get_headers(None, max_age=0)
return CookieAuthSource
[docs]def HeaderAuthSourceInitializer(secret, salt="sanity.header."):
""" An authentication source that uses the Authorization header. """
@implementer(IAuthSourceService)
class HeaderAuthSource(object):
vary = ["Authorization"]
def __init__(self, context, request):
self.request = request
self.cur_val = None
serializer = JSONSerializer()
self.serializer = SignedSerializer(
secret,
salt,
serializer=serializer,
)
def _get_authorization(self):
try:
type, token = self.request.authorization
return self.serializer.loads(token)
except Exception:
return None
def _create_authorization(self, value):
try:
return self.serializer.dumps(value)
except Exception:
return ""
def get_value(self):
if self.cur_val is None:
self.cur_val = self._get_authorization() or [None, None]
return self.cur_val
def headers_remember(self, value):
if self.cur_val is None:
self.cur_val = None
token = self._create_authorization(value)
auth_info = str(b"Bearer " + token, "latin-1", "strict")
return [("Authorization", auth_info)]
def headers_forget(self):
if self.cur_val is None:
self.cur_val = None
return []
return HeaderAuthSource