Django REST framework 认证和权限组件

认证与权限组件

身份验证是将传入请求与一组标识凭据(例如请求来自的用户或其签名的令牌)相关联的机制。然后 权限 和 限制 组件决定是否拒绝这个请求。

简单来说就是:

  • 认证确定了你是谁
  • 权限确定你能不能访问某个接口
  • 限制确定你访问某个接口的频率

一、认证组件

REST framework 提供的开箱即用的身份验证方案

REST framework 提供了一些开箱即用的身份验证方案,并且还允许你实现自定义方案。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
########################### rest_framework.authentication ###########################
# 认证基类
class BaseAuthentication:
"""
All authentication classes should extend BaseAuthentication.
"""

def authenticate(self, request):
"""
Authenticate the request and return a two-tuple of (user, token).
"""
raise NotImplementedError(".authenticate() must be overridden.")

def authenticate_header(self, request):
"""
Return a string to be used as the value of the `WWW-Authenticate`
header in a `401 Unauthenticated` response, or `None` if the
authentication scheme should return `403 Permission Denied` responses.
"""
pass


# 针对用户名/密码的http基本身份验证,从 http 请求头中获取用户名/密码
class BasicAuthentication(BaseAuthentication):
"""
HTTP Basic authentication against username/password.
"""
www_authenticate_realm = 'api'

def authenticate(self, request):
"""
Returns a `User` if a correct username and password have been supplied
using HTTP Basic authentication. Otherwise returns `None`.
"""
auth = get_authorization_header(request).split()

if not auth or auth[0].lower() != b'basic':
return None

if len(auth) == 1:
msg = _('Invalid basic header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid basic header. Credentials string should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)

try:
auth_parts = base64.b64decode(auth[1]).decode(HTTP_HEADER_ENCODING).partition(':')
except (TypeError, UnicodeDecodeError, binascii.Error):
msg = _('Invalid basic header. Credentials not correctly base64 encoded.')
raise exceptions.AuthenticationFailed(msg)

userid, password = auth_parts[0], auth_parts[2]
return self.authenticate_credentials(userid, password, request)

def authenticate_credentials(self, userid, password, request=None):
"""
Authenticate the userid and password against username and password
with optional request for context.
"""
credentials = {
get_user_model().USERNAME_FIELD: userid,
'password': password
}
user = authenticate(request=request, **credentials)

if user is None:
raise exceptions.AuthenticationFailed(_('Invalid username/password.'))

if not user.is_active:
raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))

return (user, None)

def authenticate_header(self, request):
return 'Basic realm="%s"' % self.www_authenticate_realm


# 使用django的会话框架进行身份验证。先验证是否登陆,然后验证 csrf
class SessionAuthentication(BaseAuthentication):
"""
Use Django's session framework for authentication.
"""

def authenticate(self, request):
"""
Returns a `User` if the request session currently has a logged in user.
Otherwise returns `None`.
"""

# Get the session-based user from the underlying HttpRequest object
user = getattr(request._request, 'user', None)

# Unauthenticated, CSRF validation not required
if not user or not user.is_active:
return None

self.enforce_csrf(request)

# CSRF passed with authenticated user
return (user, None)

def enforce_csrf(self, request):
"""
Enforce CSRF validation for session based authentication.
"""
check = CSRFCheck()
# populates request.META['CSRF_COOKIE'], which is used in process_view()
check.process_request(request)
reason = check.process_view(request, None, (), {})
if reason:
# CSRF failed, bail with explicit error message
raise exceptions.PermissionDenied('CSRF Failed: %s' % reason)


# 基于 Token 的认证,需要在 http 请求头中携带 Token
class TokenAuthentication(BaseAuthentication):
"""
Simple token based authentication.

Clients should authenticate by passing the token key in the "Authorization"
HTTP header, prepended with the string "Token ". For example:

Authorization: Token 401f7ac837da42b97f613d789819ff93537bee6a
"""

keyword = 'Token'
model = None

def get_model(self):
if self.model is not None:
return self.model
from rest_framework.authtoken.models import Token
return Token

"""
A custom token model may be used, but must have the following properties.

* key -- The string identifying the token
* user -- The user to which the token belongs
"""

def authenticate(self, request):
auth = get_authorization_header(request).split()

if not auth or auth[0].lower() != self.keyword.lower().encode():
return None

if len(auth) == 1:
msg = _('Invalid token header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid token header. Token string should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)

try:
token = auth[1].decode()
except UnicodeError:
msg = _('Invalid token header. Token string should not contain invalid characters.')
raise exceptions.AuthenticationFailed(msg)

return self.authenticate_credentials(token)

def authenticate_credentials(self, key):
model = self.get_model()
try:
token = model.objects.select_related('user').get(key=key)
except model.DoesNotExist:
raise exceptions.AuthenticationFailed(_('Invalid token.'))

if not token.user.is_active:
raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))

return (token.user, token)

def authenticate_header(self, request):
return self.keyword


# 基于远端用户的认证(专用用户管理服务器)
class RemoteUserAuthentication(BaseAuthentication):
"""
REMOTE_USER authentication.

To use this, set up your web server to perform authentication, which will
set the REMOTE_USER environment variable. You will need to have
'django.contrib.auth.backends.RemoteUserBackend in your
AUTHENTICATION_BACKENDS setting
"""

# Name of request header to grab username from. This will be the key as
# used in the request.META dictionary, i.e. the normalization of headers to
# all uppercase and the addition of "HTTP_" prefix apply.
header = "REMOTE_USER"

def authenticate(self, request):
user = authenticate(remote_user=request.META.get(self.header))
if user and user.is_active:
return (user, None)

SessionAuthentication强制使用CSRF Token。如果未传递有效的CSRF令牌,则会引发403错误

REST framework 默认配置

1
2
3
4
5
6
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework.authentication.SessionAuthentication',
'rest_framework.authentication.BasicAuthentication'
],
}

自定义Token认证

定义一个用户表和一个保存用户Token的表:

1
2
3
4
5
6
7
8
9
10
11
12
class UserInfo(models.Model):
username = models.CharField(max_length=16)
password = models.CharField(max_length=32)
type = models.SmallIntegerField(
choices=((1, '普通用户'), (2, 'VIP用户')),
default=1
)


class Token(models.Model):
user = models.OneToOneField(to='UserInfo')
token_code = models.CharField(max_length=128)

定义一个登录视图:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def get_random_token(username):
"""
根据用户名和时间戳生成随机token
:param username:
:return:
"""
import hashlib, time
timestamp = str(time.time())
m = hashlib.md5(bytes(username, encoding="utf8"))
m.update(bytes(timestamp, encoding="utf8"))
return m.hexdigest()


class LoginView(APIView):
"""
校验用户名密码是否正确从而生成token的视图
"""
def post(self, request):
res = {"code": 0}
print(request.data)
username = request.data.get("username")
password = request.data.get("password")

user = models.UserInfo.objects.filter(username=username, password=password).first()
if user:
# 如果用户名密码正确
token = get_random_token(username)
models.Token.objects.update_or_create(defaults={"token_code": token}, user=user)
res["token"] = token
else:
res["code"] = 1
res["error"] = "用户名或密码错误"
return Response(res)

定义一个认证类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed

class MyAuth(BaseAuthentication):
def authenticate(self, request): # 必须实现authenticate方法,返回(认证之后的用户,认证的obj)
if request.method in ["POST", "PUT", "DELETE"]:
request_token = request.data.get("token", None)
if not request_token:
raise AuthenticationFailed('缺少token')
token_obj = models.Token.objects.filter(token_code=request_token).first()
if not token_obj:
raise AuthenticationFailed('无效的token')
return token_obj.user.username, None
else:
return None, None

认证配置

视图级别认证
1
2
3
4
5
class CommentViewSet(ModelViewSet):

queryset = models.Comment.objects.all()
serializer_class = app01_serializers.CommentSerializer
authentication_classes = [MyAuth, ]
全局级别认证
1
2
3
4
# 在settings.py中配置
REST_FRAMEWORK = {
"DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.MyAuth", ]
}

二、权限组件

只有VIP用户才能看的内容。

默认配置

1
2
3
4
5
REST_FRAMEWORK = {
'DEFAULT_PERMISSION_CLASSES': [
'rest_framework.permissions.AllowAny',
],
}

自定义一个权限类

1
2
3
4
5
6
7
8
9
10
11
12
# 自定义权限
class MyPermission(BasePermission):
message = 'VIP用户才能访问'
def has_permission(self, request, view): # 必须实现has_permission,有权限返回True,无权限返回False
"""
自定义权限只有VIP用户才能访问
"""
# 因为在进行权限判断之前已经做了认证判断,所以这里可以直接拿到request.user
if request.user and request.user.type == 2: # 如果是VIP用户
return True
else:
return False

配置

视图级别配置
1
2
3
4
5
class CommentViewSet(ModelViewSet):
queryset = models.Comment.objects.all()
serializer_class = app01_serializers.CommentSerializer
authentication_classes = [MyAuth, ]
permission_classes = [MyPermission, ]
全局级别设置
1
2
3
4
5
# 在settings.py中设置rest framework相关配置项
REST_FRAMEWORK = {
"DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.MyAuth", ],
"DEFAULT_PERMISSION_CLASSES": ["app01.utils.MyPermission", ]
}

Django REST framework 认证和权限组件
https://flepeng.github.io/021-Python-32-框架-Django-REST-framework-Django-REST-framework-认证和权限组件/
作者
Lepeng
发布于
2021年8月18日
许可协议