DRF–day5 1. DRF三大认证组件 认证组件
认证入口:self.perform_authentication(request)
作用:主要是用来校验当前访问者的身份信息 游客 合法用户 非法用户
游客:代表校验通过,直接进入下一步校验
合法用户:校验通过,可以进入下一步校验
非法用户:校验失败,抛出异常。返回非法用户
权限组件
权限入口:self.check_permissions(request)
作用:检查用户所拥有的权限,某些操作必须登录才能访问,所有用户、登录可写,游客只读、自定义角色
认证通过:可以进入到下一步 频率
认证失败:直接抛出异常 返回403权限异常
频率组件
频率入口:self.check_throttles(request)
作用:限制请求在规定时间内访问某个接口的次数
没有达到:可以正常访问
达到限次:限制时间内禁止访问该接口,限制时间超过后,可以正常访问
Django的权限设计
自定义权限表 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from django.db import modelsfrom django.contrib.auth.models import AbstractUserclass User (AbstractUser ): phone = models.CharField(max_length=11 , unique=True ) class Meta : db_table = "api_user" verbose_name = "用户" verbose_name_plural = verbose_name def __str__ (self ): return self.username AUTH_USER_MODEL = "api.User"
1 2 3 4 5 # 自定义用户表后迁移出现问题 解决办法如下 1. 删除django.contrib下的admin、auth中的迁移记录 app中的迁移记录2. 卸载django重装3. 删除原有的sqllite文件4. 重新执行迁移命令
2. 认证模块
认证请求的身份
1 2 3 4 5 # 权限认证流程 1. 认证方法的入口:`self.perform_authentication(request)` ,开始执行认证相关的功能,返回用户信息2. 在request类中的`user` 方法属性保存了访问的用户信息,在`user` 方法中调用了`self._authenticate()` 来认证用户的信息3. 在`def _authenticate(self):` 内部调用认证器类来完成了用户信息的认证4. 遍历drf配置的一个个认证器对象,并调用认证器对象的`authenticate(self)` 方法来对用户信息进行解析
认证类源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 def _authenticate (self ): """ Attempt to authenticate the request using each authentication instance in turn. """ for authenticator in self.authenticators: try : user_auth_tuple = authenticator.authenticate(self) except exceptions.APIException: self._not_authenticated() raise if user_auth_tuple is not None : self._authenticator = authenticator self.user, self.auth = user_auth_tuple return self._not_authenticated()
SessionAuthentication 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class SessionAuthentication (BaseAuthentication ): """ Use Django's session framework for authentication. """ def authenticate (self, request ): """ Returns a `User` if the request session currently has a logged in user. Otherwise returns `None`. """ user = getattr (request._request, 'user' , None ) if not user or not user.is_active: return None self.enforce_csrf(request) return (user, None )
BasicAuthentication 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 class BasicAuthentication (BaseAuthentication ): """ HTTP Basic authentication against username/password. """ www_authenticate_realm = 'api' def authenticate (self, request ): """ Returns a `User` if a correct username and password have been supplied using HTTP Basic authentication. Otherwise returns `None`. """ auth = get_authorization_header(request).split() if not auth or auth[0 ].lower() != b'basic' : return None if len (auth) == 1 : msg = _('Invalid basic header. No credentials provided.' ) raise exceptions.AuthenticationFailed(msg) elif len (auth) > 2 : msg = _('Invalid basic header. Credentials string should not contain spaces.' ) raise exceptions.AuthenticationFailed(msg) try : auth_parts = base64.b64decode(auth[1 ]).decode(HTTP_HEADER_ENCODING).partition(':' ) except (TypeError, UnicodeDecodeError, binascii.Error): msg = _('Invalid basic header. Credentials not correctly base64 encoded.' ) raise exceptions.AuthenticationFailed(msg) userid, password = auth_parts[0 ], auth_parts[2 ] return self.authenticate_credentials(userid, password, request)
自定义认证类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 from rest_framework import exceptionsfrom rest_framework.authentication import BaseAuthenticationfrom api.models import User""" 1. 继承BaseAuthentication类 2. 重写authenticate方法 3. 自定义认证规则 没有认证信息返回None (游客) 有认证信息但不符合要求 (非法) 有认证信息且认证成功 返回认证用户与信息 返回的格式一定是元祖 (合法用户) """ class MyAuth (BaseAuthentication ): """ 前端发送请求必须携带 认证信息 需要按照一定的格式来 默认使用Authorization来携带认证信息 认证信息都包含在 request.META中 """ def authenticate (self, request ): auth = request.META.get('HTTP_AUTHORIZATION' , None ) print (auth) if auth is None : return None auth_split = auth.split() if not (len (auth_split) == 2 and auth_split[0 ].lower() == "auth" ): raise exceptions.AuthenticationFailed("认证信息有误,认证失败" ) if auth_split[1 ] != "abc.admin1.123" : raise exceptions.AuthenticationFailed("用户信息认证失败" ) user = User.objects.filter (username="admin1" ).first() if not user: raise exceptions.AuthenticationFailed("用户不存在或者已删除" ) return user, None
3. 权限组件
入口:self.check_permissions(request)
源码剖析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def check_permissions (self, request ): """ Check if the request should be permitted. Raises an appropriate exception if the request is not permitted. """ for permission in self.get_permissions(): if not permission.has_permission(request, self): self.permission_denied( request, message=getattr (permission, 'message' , None ) )
系统的权限类 1 2 3 4 5 6 7 8 9 1. AllowAny 允许任何人任何请求访问 2. IsAuthenticated 只允许认证成功的用户访问 非法用户以及游客无权访问 3. IsAdminUser 只允许超级管理员访问 游客 普通用户无法访问 4. IsAuthenticatedOrReadOnly 认证规则: 已经认证成功的用户可以正常操作 游客只读
使用方式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 REST_FRAMEWORK = { 'DEFAULT_AUTHENTICATION_CLASSES' : [ 'rest_framework.authentication.SessionAuthentication' , 'rest_framework.authentication.BasicAuthentication' , ], 'DEFAULT_PERMISSION_CLASSES' : [ 'rest_framework.permissions.AllowAny' , ], } class UserAPIView (APIView ): authentication_classes = [MyAuth] permission_classes = [IsAuthenticatedOrReadOnly]
自定义权限 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from rest_framework.permissions import BasePermissionfrom api.models import Userclass MyPermission (BasePermission ): """ 登陆可写 游客只读、 有权限返回True 无权限返回False """ def has_permission (self, request, view ): if request.method in ("GET" , "HEAD" , "OPTIONS" ): return True username = request.data.get("username" ) user = User.objects.filter (username=username).first() print (user) if user: return True return False
4. 频率组件
入口:self.check_throttles(request)
频率组件源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 def check_throttles (self, request ): """ Check if request should be throttled. Raises an appropriate exception if the request is throttled. """ throttle_durations = [] for throttle in self.get_throttles(): if not throttle.allow_request(request, self): throttle_durations.append(throttle.wait()) if throttle_durations: self.throttled(request, max (throttle_durations))
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 def allow_request (self, request, view ): """ Implement the check to see if the request should be throttled. On success calls `throttle_success`. On failure calls `throttle_failure`. """ if self.rate is None : return True self.key = self.get_cache_key(request, view) if self.key is None : return True self.history = self.cache.get(self.key, []) self.now = self.timer() while self.history and self.history[-1 ] <= self.now - self.duration: self.history.pop() if len (self.history) >= self.num_requests: return self.throttle_failure() return self.throttle_success()
使用方式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 REST_FRAMEWORK = { ...... 'DEFAULT_THROTTLE_CLASSES' : [ 'api.throttle.SendMessageRate' , ], 'DEFAULT_THROTTLE_RATES' : { 'anon' : '3/m' , 'user' : '10/day' , 'send' : '1/m' , } } throttle_classes = [SendMessageRate]
自定义频率类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from rest_framework.throttling import SimpleRateThrottleclass SendMessageRate (SimpleRateThrottle ): scope = "anon" def get_cache_key (self, request, view ): phone = request.query_params.get("phone" ) if not phone: return None return phone
作业 1 2 3 1. 掌握RBAC设计思想2. 理解三大认证模块源码3. 掌握三大认证模块自定义以及使用方式