1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
|
class BaseAuthentication: """ All authentication classes should extend BaseAuthentication. """
def authenticate(self, request): """ Authenticate the request and return a two-tuple of (user, token). """ raise NotImplementedError(".authenticate() must be overridden.")
def authenticate_header(self, request): """ Return a string to be used as the value of the `WWW-Authenticate` header in a `401 Unauthenticated` response, or `None` if the authentication scheme should return `403 Permission Denied` responses. """ pass
class BasicAuthentication(BaseAuthentication): """ HTTP Basic authentication against username/password. """ www_authenticate_realm = 'api'
def authenticate(self, request): """ Returns a `User` if a correct username and password have been supplied using HTTP Basic authentication. Otherwise returns `None`. """ auth = get_authorization_header(request).split()
if not auth or auth[0].lower() != b'basic': return None
if len(auth) == 1: msg = _('Invalid basic header. No credentials provided.') raise exceptions.AuthenticationFailed(msg) elif len(auth) > 2: msg = _('Invalid basic header. Credentials string should not contain spaces.') raise exceptions.AuthenticationFailed(msg)
try: auth_parts = base64.b64decode(auth[1]).decode(HTTP_HEADER_ENCODING).partition(':') except (TypeError, UnicodeDecodeError, binascii.Error): msg = _('Invalid basic header. Credentials not correctly base64 encoded.') raise exceptions.AuthenticationFailed(msg)
userid, password = auth_parts[0], auth_parts[2] return self.authenticate_credentials(userid, password, request)
def authenticate_credentials(self, userid, password, request=None): """ Authenticate the userid and password against username and password with optional request for context. """ credentials = { get_user_model().USERNAME_FIELD: userid, 'password': password } user = authenticate(request=request, **credentials)
if user is None: raise exceptions.AuthenticationFailed(_('Invalid username/password.'))
if not user.is_active: raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))
return (user, None)
def authenticate_header(self, request): return 'Basic realm="%s"' % self.www_authenticate_realm
class SessionAuthentication(BaseAuthentication): """ Use Django's session framework for authentication. """
def authenticate(self, request): """ Returns a `User` if the request session currently has a logged in user. Otherwise returns `None`. """
user = getattr(request._request, 'user', None)
if not user or not user.is_active: return None
self.enforce_csrf(request)
return (user, None)
def enforce_csrf(self, request): """ Enforce CSRF validation for session based authentication. """ check = CSRFCheck() check.process_request(request) reason = check.process_view(request, None, (), {}) if reason: raise exceptions.PermissionDenied('CSRF Failed: %s' % reason)
class TokenAuthentication(BaseAuthentication): """ Simple token based authentication.
Clients should authenticate by passing the token key in the "Authorization" HTTP header, prepended with the string "Token ". For example:
Authorization: Token 401f7ac837da42b97f613d789819ff93537bee6a """
keyword = 'Token' model = None
def get_model(self): if self.model is not None: return self.model from rest_framework.authtoken.models import Token return Token
""" A custom token model may be used, but must have the following properties.
* key -- The string identifying the token * user -- The user to which the token belongs """
def authenticate(self, request): auth = get_authorization_header(request).split()
if not auth or auth[0].lower() != self.keyword.lower().encode(): return None
if len(auth) == 1: msg = _('Invalid token header. No credentials provided.') raise exceptions.AuthenticationFailed(msg) elif len(auth) > 2: msg = _('Invalid token header. Token string should not contain spaces.') raise exceptions.AuthenticationFailed(msg)
try: token = auth[1].decode() except UnicodeError: msg = _('Invalid token header. Token string should not contain invalid characters.') raise exceptions.AuthenticationFailed(msg)
return self.authenticate_credentials(token)
def authenticate_credentials(self, key): model = self.get_model() try: token = model.objects.select_related('user').get(key=key) except model.DoesNotExist: raise exceptions.AuthenticationFailed(_('Invalid token.'))
if not token.user.is_active: raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))
return (token.user, token)
def authenticate_header(self, request): return self.keyword
class RemoteUserAuthentication(BaseAuthentication): """ REMOTE_USER authentication.
To use this, set up your web server to perform authentication, which will set the REMOTE_USER environment variable. You will need to have 'django.contrib.auth.backends.RemoteUserBackend in your AUTHENTICATION_BACKENDS setting """
header = "REMOTE_USER"
def authenticate(self, request): user = authenticate(remote_user=request.META.get(self.header)) if user and user.is_active: return (user, None)
|