Published in:2024-10-24 |

DRF–day5

1. DRF三大认证组件

认证组件

认证入口:self.perform_authentication(request)

作用:主要是用来校验当前访问者的身份信息 游客 合法用户 非法用户

游客:代表校验通过,直接进入下一步校验

合法用户:校验通过,可以进入下一步校验

非法用户:校验失败,抛出异常。返回非法用户

权限组件

权限入口:self.check_permissions(request)

作用:检查用户所拥有的权限,某些操作必须登录才能访问,所有用户、登录可写,游客只读、自定义角色

认证通过:可以进入到下一步 频率

认证失败:直接抛出异常 返回403权限异常

频率组件

频率入口:self.check_throttles(request)

作用:限制请求在规定时间内访问某个接口的次数

没有达到:可以正常访问

达到限次:限制时间内禁止访问该接口,限制时间超过后,可以正常访问

Django的权限设计

1604285192877

自定义权限表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from django.db import models
from django.contrib.auth.models import AbstractUser


class User(AbstractUser):
phone = models.CharField(max_length=11, unique=True)

class Meta:
db_table = "api_user"
verbose_name = "用户"
verbose_name_plural = verbose_name

def __str__(self):
return self.username

# 配置django默认的用户表 “app名字.模型名”
AUTH_USER_MODEL = "api.User"
1
2
3
4
5
# 自定义用户表后迁移出现问题  解决办法如下
1. 删除django.contrib下的admin、auth中的迁移记录 app中的迁移记录
2. 卸载django重装
3. 删除原有的sqllite文件
4. 重新执行迁移命令

2. 认证模块

认证请求的身份

1
2
3
4
5
# 权限认证流程
1. 认证方法的入口:`self.perform_authentication(request)`,开始执行认证相关的功能,返回用户信息
2. 在request类中的`user`方法属性保存了访问的用户信息,在`user`方法中调用了`self._authenticate()`来认证用户的信息
3.`def _authenticate(self):`内部调用认证器类来完成了用户信息的认证
4. 遍历drf配置的一个个认证器对象,并调用认证器对象的`authenticate(self)`方法来对用户信息进行解析

认证类源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def _authenticate(self):
"""
Attempt to authenticate the request using each authentication instance
in turn.
"""
# 遍历认证器 获取到每一个认证器进行认证 self.authenticators:认证器对象
for authenticator in self.authenticators:
try:
# 认证类方法包含两个参数: self是request对象 认证器本身
user_auth_tuple = authenticator.authenticate(self)
except exceptions.APIException:
self._not_authenticated()
raise

# 对于认证后的返回值的处理:如果有返回值 就将认证出的用户信息分别保存到self.request self.auth
if user_auth_tuple is not None:
self._authenticator = authenticator
# reques.user request.auth
self.user, self.auth = user_auth_tuple
return

# 如果认证器认证出的信息为 None
self._not_authenticated()

SessionAuthentication

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class SessionAuthentication(BaseAuthentication):
"""
Use Django's session framework for authentication.
"""

def authenticate(self, request):
"""
Returns a `User` if the request session currently has a logged in user.
Otherwise returns `None`.
"""

# Get the session-based user from the underlying HttpRequest object
# 从当前请求中解析出用户 如果用户不存在为None
user = getattr(request._request, 'user', None)

# Unauthenticated, CSRF validation not required
if not user or not user.is_active:
# 如果用户不存在 或者用户状态不满足 返回None
return None

# 解析出用户后 基于session的认证要重新启动csrf认证
# 如果csrf认证失败 出现异常 返回非法用户
self.enforce_csrf(request)

# CSRF passed with authenticated user
return (user, None)

BasicAuthentication

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class BasicAuthentication(BaseAuthentication):
"""
HTTP Basic authentication against username/password.
"""
www_authenticate_realm = 'api'

def authenticate(self, request):
"""
Returns a `User` if a correct username and password have been supplied
using HTTP Basic authentication. Otherwise returns `None`.
"""
# 解析出的 认证信息 该认证信息是两段式 "basic 认证字符串"
auth = get_authorization_header(request).split()

if not auth or auth[0].lower() != b'basic':
return None

# 如果认证信息存在但格式不对 则抛出异常
if len(auth) == 1:
msg = _('Invalid basic header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid basic header. Credentials string should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)

try:
auth_parts = base64.b64decode(auth[1]).decode(HTTP_HEADER_ENCODING).partition(':')
except (TypeError, UnicodeDecodeError, binascii.Error):
msg = _('Invalid basic header. Credentials not correctly base64 encoded.')
raise exceptions.AuthenticationFailed(msg)

# 从认证信息中解析出用户id 密码 进一步得到用户对象
userid, password = auth_parts[0], auth_parts[2]
# return (user, None)
return self.authenticate_credentials(userid, password, request)

自定义认证类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from rest_framework import exceptions
from rest_framework.authentication import BaseAuthentication

from api.models import User

"""
1. 继承BaseAuthentication类
2. 重写authenticate方法
3. 自定义认证规则
没有认证信息返回None (游客)
有认证信息但不符合要求 (非法)
有认证信息且认证成功 返回认证用户与信息 返回的格式一定是元祖 (合法用户)
"""


class MyAuth(BaseAuthentication):
"""
前端发送请求必须携带 认证信息 需要按照一定的格式来
默认使用Authorization来携带认证信息
认证信息都包含在 request.META中
"""

def authenticate(self, request):
# 获取认证信息
auth = request.META.get('HTTP_AUTHORIZATION', None)
print(auth)

if auth is None:
# 代表游客
return None

# 设置认证信息的校验规则 "auth 认证信息"
auth_split = auth.split()

# 校验规则
if not (len(auth_split) == 2 and auth_split[0].lower() == "auth"):
raise exceptions.AuthenticationFailed("认证信息有误,认证失败")

# 如果认证成功,开始解析用户 规定用户信息必须是 abc.admin.123
if auth_split[1] != "abc.admin1.123":
raise exceptions.AuthenticationFailed("用户信息认证失败")

# 校验数据库是否存在此用户
user = User.objects.filter(username="admin1").first()

if not user:
raise exceptions.AuthenticationFailed("用户不存在或者已删除")

return user, None

3. 权限组件

入口:self.check_permissions(request)

源码剖析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 权限组件的核心方法
def check_permissions(self, request):
"""
Check if the request should be permitted.
Raises an appropriate exception if the request is not permitted.
"""
# 遍历权限类获取到一个个的权限对象 进行权限认证
for permission in self.get_permissions():
# 参数:权限对象self 请求对象request
# 如果有权限则返回True 如果没有权限则返回False
# 通过权限类的has_permission()方法完成的权限的认证
if not permission.has_permission(request, self):
self.permission_denied(
request, message=getattr(permission, 'message', None)
)

系统的权限类

1
2
3
4
5
6
7
8
9
1. AllowAny
允许任何人任何请求访问
2. IsAuthenticated
只允许认证成功的用户访问 非法用户以及游客无权访问

3. IsAdminUser
只允许超级管理员访问 游客 普通用户无法访问
4. IsAuthenticatedOrReadOnly
认证规则: 已经认证成功的用户可以正常操作 游客只读

使用方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 全局配置
REST_FRAMEWORK = {
# DRF默认的权限认证类
'DEFAULT_AUTHENTICATION_CLASSES': [
# 默认的认证器
'rest_framework.authentication.SessionAuthentication', # 基于session
'rest_framework.authentication.BasicAuthentication', # Basic
# 'api.authentications.MyAuth',
],
# DRF默认的权限配置
'DEFAULT_PERMISSION_CLASSES': [
'rest_framework.permissions.AllowAny',
],
}
# 局部配置
class UserAPIView(APIView):
authentication_classes = [MyAuth]
# 权限组件的功能依赖于认证组件 否则无法工作
permission_classes = [IsAuthenticatedOrReadOnly]

自定义权限

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from rest_framework.permissions import BasePermission

from api.models import User


class MyPermission(BasePermission):
"""
登陆可写 游客只读、
有权限返回True 无权限返回False
"""

def has_permission(self, request, view):
# 如果是只读 则所有人都可以访问
if request.method in ("GET", "HEAD", "OPTIONS"):
return True
username = request.data.get("username")
# 如果用户访问的是写操作 判断用户是否有登陆信息
user = User.objects.filter(username=username).first()
print(user)
if user:
return True
return False

4. 频率组件

入口:self.check_throttles(request)

频率组件源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def check_throttles(self, request):
"""
Check if request should be throttled.
Raises an appropriate exception if the request is throttled.
"""

# 1.遍历配置的频率认证类,初始化得到一个个的频率类对象 会调用的认证类的__init__方法
# 2.调用频率认证类的allow_request(request, self)方法来判断当前请求是否被限次(x限次不能访问)
# 3.频率认证类会在限次后返回Flase,调用wait()方法,返回下一次可以访问此接口的剩余时间
# 注意:所有的认证类都继承了SimpleRateThrottle
throttle_durations = []
# 遍历频率类获取到一个个的频率类对象来进行请求的限制
for throttle in self.get_throttles():
# 调用认证类的allow_request来进行请求的限次 限次了返回False
if not throttle.allow_request(request, self):
# 一旦当前请求被限次了 返回False 调用了wait() 返回下一次可以访问此接口的剩余时间
throttle_durations.append(throttle.wait())

if throttle_durations:
self.throttled(request, max(throttle_durations))

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def allow_request(self, request, view):
"""
Implement the check to see if the request should be throttled.

On success calls `throttle_success`.
On failure calls `throttle_failure`.
"""
if self.rate is None:
return True

# get_cache_key获取当前用户的唯一标识 并存在缓存中
# throttle_18500001111_1
self.key = self.get_cache_key(request, view)
if self.key is None:
return True

# 获取当前self.key的访问信息
# 如果self.key是初次访问 self.history 为空[]
# self.history每次访问的时间 [16:21:56, 16:21:57, 16:21:58]
self.history = self.cache.get(self.key, [])
# 获取当前时间
self.now = self.timer()

# Drop any requests from the history which have now passed the
# throttle duration
# 判断当前事件减去单位时间 是不是大于最后一次访问时间
# 满足条件 则清除 不再累加
while self.history and self.history[-1] <= self.now - self.duration:
self.history.pop()

# 如果self.history大于 self.num_requests访问次数,则直接不允许访问
if len(self.history) >= self.num_requests:
# return False 不允许访问
return self.throttle_failure()
return self.throttle_success()

使用方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 全局配置
REST_FRAMEWORK = {
......
# DRF频率配置
'DEFAULT_THROTTLE_CLASSES': [
# 'rest_framework.throttling.UserRateThrottle',
'api.throttle.SendMessageRate',
],
'DEFAULT_THROTTLE_RATES': {
'anon': '3/m',
'user': '10/day',
'send': '1/m',
}
}

# 局部配置
throttle_classes = [SendMessageRate]

自定义频率类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from rest_framework.throttling import SimpleRateThrottle


class SendMessageRate(SimpleRateThrottle):
scope = "anon"

# 只对包含手机号的请求做验证
def get_cache_key(self, request, view):
phone = request.query_params.get("phone")

if not phone:
return None

# 返回数据 根据手机号动态展示返回的值
return phone

作业

1
2
3
1. 掌握RBAC设计思想
2. 理解三大认证模块源码
3. 掌握三大认证模块自定义以及使用方式
Prev:
Next: