1. 配置文件
1.1 介绍
from flask import Flask
app = Flask(__name__)
# 打印默认的配置信息
print(app.config)
flask中的配置文件是一个flask.config.Config对象(继承dice字典),
默认配置为;
{
# Debug模式, 修改为Debug模式, 修改代码会自动重启程序
;DEBUG;: get_debug_flag(default=False),
# 测试模式
;TESTING;: False,
;PROPAGATE_EXCEPTIONS;: None,
;PRESERVE_CONTEXT_ON_EXCEPTION;: None,
# 密钥, 如session加密使用
;SECRET_KEY;: None,
# session过期时间, 默认为31天
;PERMANENT_SESSION_LIFETIME;: timedelta(days=31),
;USE_X_SENDFILE;: False,
;LOGGER_NAME;: None,
;LOGGER_HANDLER_POLICY;: ;always;,
;SERVER_NAME;: None,
;APPLICATION_ROOT;: None,
# session的cookie保存在浏览器上的key
;SESSION_COOKIE_NAME;: ;session;,
# session的cookie保存的域名
;SESSION_COOKIE_DOMAIN;: None,
# session的cookie保存的路径
;SESSION_COOKIE_PATH;: None,
# session的cookie是否只支持http传输
;SESSION_COOKIE_HTTPONLY;: True,
# session是否传输http
;SESSION_COOKIE_SECURE;: False,
# 会话刷新每个请求
;SESSION_REFRESH_EACH_REQUEST;: True,
;MAX_CONTENT_LENGTH;: None,
;SEND_FILE_MAX_AGE_DEFAULT;: timedelta(hours=12),
;TRAP_BAD_REQUEST_ERRORS;: False,
;TRAP_HTTP_EXCEPTIONS;: False,
;EXPLAIN_TEMPLATE_LOADING;: False,
;PREFERRED_URL_SCHEME;: ;http;,
;JSON_AS_ASCII;: True,
;JSON_SORT_KEYS;: True,
;JSONIFY_PRETTYPRINT_REGULAR;: True,
;JSONIFY_MIMETYPE;: ;application/json;,
;TEMPLATES_AUTO_RELOAD;: None,
}
1.2 配置方式
列举几中常用的配置方式.
1. app属性
直接给app对象赋值属性, 因为只能配置两项, debug和secret_key.
# main.py
from flask import Flask
app = Flask(__name__)
# 通过app属性设置
app.debug = True
;app.route(;/;)
def index():
return ;hello;
if __name__ == ;__main__;:
app.run()
2. 字典形式
以字典的形式;给flask配置文件做配置.
from flask import Flask
app = Flask(__name__)
# 以字典形式设置配置
app.config[;DEBUG;] = on
;app.route(;/;)
def index():
return ;hello;
if __name__ == ;__main__;:
app.run()
3. 配置文件
编写一个配置文件, 在Flask项目中使用app.config.from_pyfile里传递配置文件的路径.
* 1. 在项目目录下新建settings.py 配置文件,(名称随意, 符合命令规则即可).
# settings.py
DEBUG = True
* 2. 在Flask项目中使用app.app.config.from_pyfile方法加载配置.
from flask import Flask
app = Flask(__name__)
# 加载配置文件
app.config.from_pyfile(;settings.py;)
;app.route(;/;)
def index():
return ;hello;
if __name__ == ;__main__;:
app.run()
4. 以类的形式
编写一个配置文件, 以类的形式编写配置, 一个文件可以用多个套配置;这样减少测试与更改(推荐使用),
在Flask项目中使用app.config.from_object(;python类或类的路径;)加载配置.
* 1. 在项目目录下新建setobj.py 配置文件,(名称随意, 符合命令规则即可).
# setobj.py
# 上线使用
class Config(object):
DEBUG = False
TESTING = False
# 测试使用
class TestingConfig(Config):
DEBUG = True
* 2. 在Flask项目中使用app.config.from_object()方法加载配置.
from flask import Flask
app = Flask(__name__)
# 文件.类名, 使用该类中的配置
app.config.from_object(;setobj.TestingConfig;)
;app.route(;/;)
def index():
return ;hello;
if __name__ == ;__main__;:
app.run()
2. flask使用session
session[;键;] = 值
flask内置session通过SECRET_KEY的密钥进行加密后, 当做cookie返回给浏览器.
下次发送请求, 会携带cookie过来, 返解, 在放到session中.
session.get(;key;) 将值取出.
# session的相关配置
# 密钥, 如session加密使用
;SECRET_KEY;: None,
# session过期时间, 默认为31天
;PERMANENT_SESSION_LIFETIME;: timedelta(days=31),
# session的cookie保存在浏览器上的名称
;SESSION_COOKIE_NAME;: ;session;,
# session的cookie保存的域名
;SESSION_COOKIE_DOMAIN;: None,
# session的cookie保存的路径
;SESSION_COOKIE_PATH;: None,
# session的cookie是否只支持http传输
;SESSION_COOKIE_HTTPONLY;: True,
# session是否传输http
;SESSION_COOKIE_SECURE;: False,
# 会话刷新每个请求
;SESSION_REFRESH_EACH_REQUEST;: True,
from flask import Flask, session
app = Flask(__name__)
# 设置密钥, 使用session必须使用
app.secret_key = ;asdasdafa;
;app.route(;/login;)
def login():
# 保存session
session[;user;] = ;kid;
# 设置cookie中的名称
app.config[;SESSION_COOKIE_NAME;] = ;session;
return ;登入成功!;
;app.route(;/index;)
def index():
# 获取session
user = session.get(;user;)
print(f;session的信息: {user};)
return ;index主页面;
# 启动
if __name__ == ;__main__;:
app.run()
在浏览器中先访问: http://127.0.0.1:5000/login 将用户信息保存到cookie中
再访问: http://127.0.0.1:5000/login 将用户信息取出
3. CBV
Flask代码可以使用CBV方式写, 视图类需要继承view或其子类.
3.1 view类源码
# ps补充点, 修改函数名
def view():
# view
print(view.__name__)
# 修改名称
view.__name__ = ;qwe;
# qwe
print(view.__name__)
# 执行view函数
view()
# view类源码
class View:
# 接收的请求
methods: t.ClassVar[t.Optional[t.Collection[str]]] = None
# 提供自动选项
provide_automatic_options: t.ClassVar[t.Optional[bool]] = None
# 使用的装饰器
decorators: t.ClassVar[t.List[t.Callable]] = []
# 初始化每个请求
init_every_request: t.ClassVar[bool] = True
# 必须重写dispatch_request方法处理每个请求
def dispatch_request(self) -> ft.ResponseReturnValue:
raise NotImplementedError()
# 静态方法, 类.as_view() 将类名作为第一个参数进行传递
;classmethod
def as_view(
# 类名 别名(必须提供)
cls, name: str, *class_args: t.Any, **class_kwargs: t.Any
) -> ft.RouteCallable:
# 初始化请求
if cls.init_every_request:
# 定义view函数接收所有关键字参数
def view(**kwargs: t.Any) -> ft.ResponseReturnValue:
self = view.view_class( # type: ignore[attr-defined]
*class_args, **class_kwargs
)
return current_app.ensure_sync(self.dispatch_request)(**kwargs)
else:
self = cls(*class_args, **class_kwargs)
# 定义view函数接收所有关键字参数
def view(**kwargs: t.Any) -> ft.ResponseReturnValue:
return current_app.ensure_sync(self.dispatch_request)(**kwargs)
# 执行view() 先执行self.dispatch_request处理请求, 在通过current_app.ensure_sync返回
# 装饰器
if cls.decorators:
# 修改view的名称为 name
view.__name__ = name
view.__module__ = cls.__module__
# 将decorators中的装饰器遍历取出, 最前面的装饰器装饰请求方法, 后面的装饰器装饰前一个装饰器.
for decorator in cls.decorators:
view = decorator(view)
view.view_class = cls
# 修改view的名称为 name
view.__name__ = name
view.__doc__ = cls.__doc__
view.__module__ = cls.__module__
;;;
将自己设置的methods给view对象的methods属性,
如果自己没有定义去父类view中查找None, (自己没设置为None)
默认仅支持GET.
在执行到添加路由 app.add_url_rule时
# 从路由中pop出设置的请求方法
methods = options.pop(;methods;, None)
# 如果路由装饰器中没有设置去视图类中获取methods属性的值,也没有设置的话 值为GET.
if methods is None:
methods = getattr(view_func, ;methods;, None) or (;GET;,)
...
;;;
view.methods = cls.methods # type: ignore
view.provide_automatic_options = cls.provide_automatic_options
# 将view返回
return view
;;;
如果没有view.__name__ = name, 那么没有设置endpoint参数的时候在执行_endpoint_from_view_func函数获取
被路由绑定的函数的名称, 都是view, 如果有多个应用, 就会会出现重名..
;;;
3.2 继承view类
继承view类需要重写dispatch_request方法, 从request中获取请求方式, 并写对应的请求方法请求请求.
如果需要限制请求方法, 在类中设置methods属性的参数即可.
如果需要使用装饰器则, 在类中设置decorator属性的参数即可.
from flask import Flask, request
from flask import views
app = Flask(__name__)
# 模拟装饰器
def login(func):
def inner(*args, **kwargs):
print(;执行装饰器;)
res = func(*args, **kwargs)
return res
return inner
class IndexView(views.View):
# methods属性, 值是字符串
methods = [;get;, ;post;]
# 设置装饰器, 值是装饰器名
decorators = [login]
# 需要重写dispatch_request方法, 处理请求
def dispatch_request(self):
if request.method == ;GET;:
return self.get()
# 请求方法
def get(self):
print(;执行get请求;)
return ;get;
# (注册路由, 执行的视图类as_view(;路由别名;) 将view返回到这里 view的名字是index
app.add_url_rule(;/index;, view_func=IndexView.as_view(;index;))
if __name__ == ;__main__;:
app.run()
在Postman中测试, 输入地址: http://127.0.0.1:5000/index (分别以不同的请求方式访问)
访问成功:
页面显示 get
终端显示 执行装饰器 (换行) 执行get请求
3.3 MethodView类源码
MethodView继承了View类, 重写了dispatch_request方法, methods属性也有默认值.
# MethodView
class MethodView(View):
# 子类向父类传给信息, 接收继承MethodView的类的信息, cls为子类类名
def __init_subclass__(cls, **kwargs: t.Any) -> None:
# View类继续接收
super().__init_subclass__(**kwargs)
# methods 不是MethodView子类的属性
if ;methods; not in cls.__dict__:
# methods为一个空字典
methods = set()
# 将从所有父类中获取methods属性的值
for base in cls.__bases__:
if getattr(base, ;methods;, None):
# 将获取的值添加到字段中
methods.update(base.methods)
;;;
http_method_funcs = frozenset(
[;get;, ;post;, ;head;, ;options;, ;delete;, ;put;, ;trace;, ;patch;])
frozenset 不可变集合
;;;
for key in http_method_funcs:
# 将所有http_method_funcs的值转为大写添加到methods字典中
if hasattr(cls, key):
methods.add(key.upper())
if methods:
# 将methods的值赋值给子类的methods方法
cls.methods = methods
# 重写dispatch_request方法
def dispatch_request(self, **kwargs: t.Any) -> ft.ResponseReturnValue:
# 将请求方式转为小写, 通过getattr获取对应的请求方法赋值给meth
meth = getattr(self, request.method.lower(), None)
# 没有对应的请求方法, and request.method == ;HEAD;
if meth is None and request.method == ;HEAD;:
# 获取get方法赋值给meth
meth = getattr(self, ;get;, None)
# 断言, meth不为空
assert meth is not None, f;Unimplemented method {request.method!r};
# 获取对应的请求方法后, 传递给 current_app.ensure_sync
return current_app.ensure_sync(meth)(**kwargs)
3.4 继承MethodView类
继承MethodView开发时可以省去重新dispatch_request方法, 只写对应的请求方法即可.
如果需要限制请求方法, 在类中设置methods属性的参数即可.
如果需要使用装饰器则, 在类中设置decorator属性的参数即可.
from flask import Flask
from flask import views
app = Flask(__name__)
# 模拟装饰器
def login(func):
def inner(*args, **kwargs):
print(;执行装饰器;)
res = func(*args, **kwargs)
return res
return inner
class IndexView(views.MethodView):
# 设置装饰器, 值是装饰器名
decorators = [login]
def get(self):
print(;执行get请求;)
return ;get;
# (注册路由, 执行的视图类as_view(;路由别名;)
app.add_url_rule(;/index;, view_func=IndexView.as_view(;index;))
if __name__ == ;__main__;:
app.run()
在Postman中测试, 输入地址: http://127.0.0.1:5000/index (分别以不同的请求方式访问)
访问成功:
页面显示 get
终端显示 执行装饰器 (换行) 执行get请求
4. 模板语法
django默认使用模板语法是DTL, 也可以改为jinja2.
flask与DTL的使用方法几乎相识, 区别: jinja2支持函数加括号执行
参考学习: https://blog.csdn.net/QQ_46137324/article/details/123337020
模板语法的书写方法;
{{ 变量名 }} 变量相关
{% %} 逻辑相关 for循环等
模板语法的取值格式:
1. 句点符 .键取值
2. 可以点索引
3. 两者混合使用
jinja2处理xss攻击:
模板层 要渲染的字符串|safe
后端 Markup(;<input type=;text;>;)
* Markup等价django的mark_safe
pycharm设置jinja2模板代码提示.
5. 请求响应
Flask的请求被封装到request函数中.
request请求方法
.method 请求方法
.args 获取get请求提交的数据
.form 获取form表单提交的数据
.values get请求与post请求的数据总和
.query_string 获取get请求提交的数据, 路由?后的参数, 字符串格式, 需要手动截数据
.cookie 获取cookie
.headers 请求头
.files 获取文件
.path 不带域名的, 请求路径
.full_path 不带域名, 带参数的请求路径
.script_root
.base_url 带域名带参数的请求路径
.url_root 域名
.host_url 域名
.host ip;端口
5.1 请求方法
.method 请求方法
from flask import Flask, request
app = Flask(__name__)
;app.route(;/;)
def index():
# 获取请求方法
print(request.method) # 大写的请求方法
return ;index;
if __name__ == ;__main__;:
app.run()
浏览器中测试, 输入: http://127.0.0.1:5000/
5.2 获取get提交数据
.args 获取get请求提交的数据
结果是一个ImmutableMultiDict不可变字典
通过 ImmutableMultiDict.get(key) 获取值
如果有多个值通过 ImmutableMultiDict.getlist(key), 得到一个列表, 保存多个值.
* 值只有一个使用.getlist(key) 效果和get(key)一样的.
from flask import Flask, request
app = Flask(__name__)
;app.route(;/;)
def index():
# 不可变字典
# ImmutableMultiDict([(;name;, ;kid;), (;name;, ;123;), (;pwd;, ;123;)])
print(request.args)
# 当做字典使用 结果为kid
print(request.args.get(;name;)) # 得到第一个name的值
# 获取所有name对应的值 [;kid;, ;123;]
print(request.args.getlist(;name;))
return ;index;
if __name__ == ;__main__;:
app.run()
浏览器中测试, 输入:http://127.0.0.1:5000/?name=kid&pwd=123&name=123
* name有连个值
5.3 获取form表单数据
.form 获取form表单提交的数据
from flask import Flask, request
app = Flask(__name__)
;app.route(;/;, methods=[;get;, ;post;])
def index():
# 通过Postman提交数据form-data数据, get post请求的结果是一样的
# ImmutableMultiDict([(;name;, ;123;), (;pwd;, ;123;)])
print(request.form)
return ;index;
if __name__ == ;__main__;:
app.run()
5.4 获取get/post请求数据总和
.values get请求与post请求的数据总和
from flask import Flask, request
app = Flask(__name__)
;app.route(;/;, methods=[;post;])
def index():
# 路由后面提交的值为一个字典 , 表单为另一个字典
# CombinedMultiDict([ImmutableMultiDict([(;name;, ;qq;), (;age;, ;18;)]), / ImmutableMultiDict([(;name;, ;kid;), (;pwd;, ;123;)])])
print(request.values)
return ;index;
if __name__ == ;__main__;:
app.run()
浏览器中测试, 输入: 127.0.0.1:5000?name=qq&age=18 提交方式为post, 携带form-data的数据.
如果提交给get请求, get请求携带数据, 请求提交就没有表单数据的数据了.
get请求没有携带数据, 提交表单数据可以.
get请求携带数据, 提交文件不影响.
5.5 Flask使用cookie
* 1. 先通过make_response类生成, response对象
* 2. response对象.set_cookie(;key;, ;value;)
* 3. 将response对象返回, 返回是会将cookie保存
* 4. request.cookies获取cookie, 值为不可变字典.
from flask import Flask, request, make_response
app = Flask(__name__)
;app.route(;/login;)
def login():
# 生成response对象
res = make_response(;index;)
# set_cookie(key, value)
res.set_cookie(;name;, ;kid;)
# 返回response对象
return res
;app.route(;/index;)
def index():
# 获取cookie, ImmutableMultiDict([(;name;, ;kid;)])
print(request.cookies)
return ;index;
if __name__ == ;__main__;:
app.run()
浏览器中测试:
先输入: 127.0.0.1:5000/login 保存cookie到客户端
再输入: 127.0.0.1:5000/index 获取请求携带cookie
5.6 获取请求头信息
from flask import Flask, request
app = Flask(__name__)
;app.route(;/;)
def index():
# 获取请求头
print(request.headers)
;;;
Cookie: 123
---上面是自己添加到请求头的信息, 下面是默认获取的信息----
User-Agent: PostmanRuntime/7.29.0
Accept: */*
Postman-Token: 8d2662e4-7ecd-4038-9c1d-22d8df6cf74d
Host: 127.0.0.1:5000
Accept-Encoding: gzip, deflate, br
Connection: keep-alive
;;;
return ;index;
if __name__ == ;__main__;:
app.run()
5.7 获取文件信息
通过表单提交文件, 后端读取文件, 保存到本地.
from flask import Flask, request
app = Flask(__name__)
;app.route(;/;)
def index():
# ImmutableMultiDict([(;file;, <FileStorage: ;2022-09-19_00899.bmp; (;image/bmp;)>)])
print(request.files)
# 获取文件
img_obj = request.files.get(;file;)
# 将文件保存到本地
with open(img_obj.filename, mode=;wb;) as wf:
for i in img_obj:
wf.write(i)
return ;index;
if __name__ == ;__main__;:
app.run()
5.8 路径相关
.path 不带域名的, 请求路径
;app.route(;/;) http://127.0.0.1:5000 得到: /
.full_path 不带域名, 带参数的请求路径
;app.route(;/;) http://127.0.0.1:5000/?name=kid 得到: /?name=kid
.base_url 带域名带请求路径, 不带参数
;app.route(;/index;) 127.0.0.1:5000/index?name=kid 得到: http://127.0.0.1:5000/index
.url_root 域名 得到: http://127.0.0.1:5000/
.host_url 域名 得到: http://127.0.0.1:5000/
.host ip:端口 得到: 127.0.0.1:5000
6. 响应对象
返回字符串信息: return ;字符串; 或 字符串格式html
返回html模板: return render_template(;html文件;, **loacls())
重定向: return redirect(;地址;)
返回json格式数据: return jsonify({;key;=;value;})
6.1 返回html页面
* 1. 在项目目录下新建templates目录
* 2. 在templates目录下新建index.html文件
<!DOCTYPE html>
<html lang=;en;>
<head>
<meta charset=;UTF-8;>
<title>Title</title>
</head>
<body>
{{ name }}
{{ age }}
</body>
</html>
* 3. 新建main.py 文件
from flask import Flask, render_template
app = Flask(__name__)
;app.route(;/;)
def index():
name = ;kid;
age = 18
print(locals()) # {;name;: ;kid;, ;age;: 18}
# locals() 获取当前名称空间的所有变量, 在使用** 将字典解压为 ket=value 关键字参数
return render_template(;index.html;, **locals())
if __name__ == ;__main__;:
app.run()
* 4. 浏览器中测试: 127.0.0.1:5000
6.2 返回json格式数据
rom flask import Flask, jsonify
app = Flask(__name__)
;app.route(;/;)
def index():
;;;
{
;name;: ;kid;
}
;;;
# return jsonify(name=;kid;)
;;;
{
;age;: 18
}
;;;
return jsonify({;age;: 18})
if __name__ == ;__main__;:
app.run()
6.3 返回response对象
见5.5小节返回response对象, 携带cookie.
7. session
7.1 session的使用
# session的使用
from flask import Flask, session
app = Flask(__name__)
app.secret_key = ;asd;
;;;
session 是通过 session;盐;secret_key 生成的
需要设置 key 否则会报错 ↓
RuntimeError: The session is unavailable because no secret key was set.
Set the secret_key on the application to something unique and secret.
;;;
;app.route(;/set_session;)
def set_session():
# 设置session
session[;name;] = ;kid;
return ;set_session;
;app.route(;/get_session;)
def get_session():
# session是一个不可变字典, 与字典的使用方式是一样的, 使用get获取不打值不会报错
name = session.get(;name;)
print(name)
return ;get_session;
;app.route(;/del_session;)
def delete():
# 按key pop掉
session.pop(;name;)
return ;delete_session;
if __name__ == ;__main__;:
app.run()
测试:
1. 先访问 set_session 保存session session[;name;] = ;kid;
2. 再访问 get_session 获取session session.get(;name;) 值为kid
3. 再访问 del_session 删除session session.pop(;name;)
4. 再访问 get_session 获取session session.get(;name;) 值为None
7.2 源码解析
# main.py
from flask import Flask, session
app = Flask(__name__)
app.secret_key = ;asd;
# session配置属性
app.session_interface
# app.py
# session的配置是一个 SessionInterface类的对象
session_interface: SessionInterface = SecureCookieSessionInterface()
# session.py session源码
class SecureCookieSessionInterface(SessionInterface):
#;应该应用于密钥顶部的salt,(加盐)
#;对基于cookie的session进行签名。
salt = ;cookie-session;
#;用于签名的哈希函数。默认值为sha1
digest_method = staticmethod(hashlib.sha1)
#;其危险支持的密钥派生的名称。默认值 is hmac.
key_derivation = ;hmac;
#;有效负载的python序列化程序。默认为紧凑型
#;JSON派生的序列化程序;支持一些额外的Python类型
#;例如datetime对象或元组。
# session的json序列化程序
serializer = session_json_serializer
# session对象
session_class = SecureCookieSession
# 生成签名
def get_signing_serializer(
self, app: ;Flask;
) -> t.Optional[URLSafeTimedSerializer]:
# 判断secret_ket 是否配置
if not app.secret_key:
return None
# 签名关键字参数 = dict(密钥派生的名称, 签名方式)
signer_kwargs = dict(
key_derivation=self.key_derivation, digest_method=self.digest_method
)
# 序列化方法(secret_key, 盐, 序列化方法, 签名关键字参数)得到session签名
return URLSafeTimedSerializer(
app.secret_key,
salt=self.salt,
serializer=self.serializer,
signer_kwargs=signer_kwargs,
)
# cookie转session
def open_session(
self, app: ;Flask;, request: ;Request;
) -> t.Optional[SecureCookieSession]:
# s = 签名方法
s = self.get_signing_serializer(app)
# 判断签名方法是或为空
if s is None:
return None
# val = SESSION_COOKIE_NAME 的配置信息, 默认为session, 去cookie中找键为session的值
val = request.cookies.get(self.get_cookie_name(app))
# 如果没有session, 则返回空的session对象 (为了后续使用session是session对象, 而不是None)
if not val:
return self.session_class()
# 超时时间 默认为31天, 将31天转为以秒为单位
max_age = int(app.permanent_session_lifetime.total_seconds())
# 序列化器的load方法, 将cookie中保存的session签名转为不可变字典的session
try:
data = s.loads(val, max_age=max_age)
return self.session_class(data)
# 错误签名(签名过期, cookie被改...) 返回一个空的session对象
except BadSignature:
return self.session_class()
# session转cookie
def save_session(
self, app: ;Flask;, session: SessionMixin, response: ;Response;
) -> None:
# 获取一个配置信息, 用于设置cookie的
name = self.get_cookie_name(app)
domain = self.get_cookie_domain(app)
path = self.get_cookie_path(app)
secure = self.get_cookie_secure(app)
samesite = self.get_cookie_samesite(app)
httponly = self.get_cookie_httponly(app)
# 如果会话修改为空;请删除cookie。
# 如果会话为空;则返回而不设置cookie。
;;;
session 为空 modified的值为True时触发, 将cookie的信息删除
modified是一个标准位, 为session设置值之后 modified的值为True
删除session session.pop() session为空时 modified的值为True 清除掉cookie
;;;
if not session:
if session.modified:
response.delete_cookie(
name,
domain=domain,
path=path,
secure=secure,
samesite=samesite,
httponly=httponly,
)
return
# 如果会话被访问;则添加“Vary:Cookie”头。
if session.accessed:
response.vary.add(;Cookie;)
# 是否设置cookie
if not self.should_set_cookie(app, session):
return
# 获取时间
expires = self.get_expiration_time(app, session)
# 序列化器的dumps方法将session加密得到签名
val = self.get_signing_serializer(app).dumps(dict(session)) # type: ignore
response.set_cookie(
name,
val, # type: ignore
expires=expires,
httponly=httponly,
domain=domain,
path=path,
secure=secure,
samesite=samesite,
)
如果自定义session的存放方式, 继承SessionInterface重写几个方法即可.
在为app.session_interface属性设置为自定义类的对象即可.
保存session的时候会调用save_session,
将session加密得到签名(不可变字典->字符串), 将签名存放到cookie中(session:签名字符串).
访问页面的时候从cookie中取出加签名字符串, 反解得到不可变字典.
8. 闪现
Flask的闪现功能将数据存入session中, 可以在不同的视图函数中多次存储, 获取一次之后就会删除session中的数据.
8.1 基本存储
存储:
falsh[;value;]
取所有值:
get_flashed_messages()
from flask import Flask, flash, get_flashed_messages
app = Flask(__name__)
# 闪现是基于session实现的, 所以要设secret_key参数
app.secret_key = ;asd;
;app.route(;/set_flash1;)
def set_flash1():
flash(;hello;)
return ;set_session;
;app.route(;/set_flash2;)
def set_flash2():
flash(;world!;)
return ;set_session;
;app.route(;/get_flash1;)
def get_flash1():
# 获取所以的flash 中存储的数据
print(get_flashed_messages())
return ;get_session;
;app.route(;/get_flash2;)
def get_flash2():
# 获取所以的flash 中存储的数据
print(get_flashed_messages())
return ;get_session;
if __name__ == ;__main__;:
app.run()
测试:
先访问 set_flash1 flash(;hello;) 将hello保存到session中.
再访问 set_flash1 flash(;world!;) 将world!保存到session中.
再访问 get_flash1 get_flashed_messages() 获取保存到session中的信息
再访问 get_flash2 get_flashed_messages() 已经被获取过后信息被清除了, 得到一个空列表.
8.2 分成存储
分类存储:
falsh[;value;, category=;分类名;]
按分类取值:
get_flashed_messages(category_filter=;分类名;)
get_flashed_messages(category_filter=[;分类名;])
from flask import Flask, flash, get_flashed_messages
app = Flask(__name__)
# 闪现是基于session实现的, 所以要设secret_key参数
app.secret_key = ;asd;
;app.route(;/set_flash1;)
def set_flash1():
flash(;hello;, category=;t1;)
return ;set_session;
;app.route(;/set_flash2;)
def set_flash2():
flash(;world!;, category=;t2;)
return ;set_session;
;app.route(;/get_flash1;)
def get_flash1():
# 获取所以的flash 中存储的数据
print(get_flashed_messages(category_filter=;t1;))
return ;get_session;
;app.route(;/get_flash2;)
def get_flash2():
# 获取所以的flash 中存储的数据
print(get_flashed_messages(category_filter=;t2;))
return ;get_session;
if __name__ == ;__main__;:
app.run()
测试:
先访问 set_flash1 flash(;hello;) 将hello保存到session中, 分类为t1.
再访问 set_flash1 flash(;world!;) 将world!保存到session中, 分类为t2.
再访问 get_flash1 get_flashed_messages(category_filter=;t1;) 获取保存到session中的t1分类的信息.
再访问 get_flash2 get_flashed_messages(category_filter=;t2;) 已经被获取过后所有信息被清除了, 得到一个空列表.
9. 请求拓展
请求拓展类似Django的中间件, 请求前, 请求后, 执行什么操作.
基于装饰器实现, 绑定一个函数执行一些操作.
9.1 请求来时触发
请求来时就会触发, 类似django的process_request.
有返回值, 一般为None, 如果直接返回response对象, 直接返回. 不往后执行.
;app.beform_requesr
def process_request(*args, **kwargs)
...
9.2 请求走时触发
请求走时就会触发, 类似django的process_response.
;app.after_request
def process_response(*args, **kwargs)
...
9.3 请求拓展实例
1. 实例1
请求的执行顺序, 先执行before_request, 再执行视图函数, 最后执行after_request.
from flask import Flask, session, request, redirect
app = Flask(__name__)
# 参数设置
app.debug = True
app.secret_key = ;asd;
;app.before_request
def process_request(*args, **kwargs):
# 再不知道是否有参数的时候先使用 *args, **kwargs
print(*args, **kwargs) # 空的什么都没有
# 判断访问的路径, 访问login不做处理, 其他的路径都需要处理
if request.path == ;/login;:
return None
# 没有session都重定向到登入页面
if not session.get(;name;):
# 重定向, 会想返回响应, 再由浏览器发送请求, 会先执行app.after_request
return redirect(;/login;)
;app.after_request # 需要接收response对象
def process_response(response):
# 访问index主页面的时候做处理
if request.path == ;/index;:
# Response对象 , from flask import Response, Response继承ResponseBase, ResponseBase中有一些方法
print(response) # <Response 14 bytes [200 OK]> index主页面 英文5 ; 中文3*3 9 = 14bytes
# 判断session中是否有值, 有则删除
if session.get(;name;):
session.pop(;name;)
# 将response 返回
return response
# 登入
;app.route(;/login;)
def login():
session[;name;] = ;kid;
return ;登入成功!;
# index主页面
;app.route(;/index;)
def index():
return ;index主页面;
if __name__ == ;__main__;:
app.run()
测试:
先访问 index, (这个时候session中没有值.)
第一步:
访问来时执行process_request函数, 对访问地址进行判断, 如果是;/login;直接放行,
不是则从请求中获取session, session有值则放行, 否则重定向到登入页面. --> session没有值, 执行重定向.
第二步:
重定向时, 先经过process_response含, 对访问地址进行判断, 如果是;/index; , 判断session中是否有值, 如果有则清除.
第三步:
浏览器向服务端重新发送请求, 访问到;/login;, 先执行process_request函数, --> 放行
第四步:
执行login函数, 保存session. 返回;登入成功;, 再经过process_response, --> 放行
再次访问 index, (这个时候session中有值.)
第一步:
访问来时执行process_request函数, session有值, --> 放行
第二步:
执行index函数, 返回;index主页面; 主页面, 再进过process_response, session中有值, 则将session值删除
2. 实例2
在before_request中直接返回response对象, 会执行所有响应不会执行视图函数.
from flask import Flask, make_response
app = Flask(__name__)
;app.before_request
def ip_filter():
# xx操作
res = make_response(;你的访问被限制!;)
# 直接返回response对象, 会执行所有响应不会执行视图函数
return res
;app.after_request
def after1(response):
print(;请求走了1!;)
return response
# 执行同级别的 响应
;app.after_request
def after2(response):
print(;请求走了2!;)
return response
;app.route(;/;)
def index():
return ;index;
if __name__ == ;__main__;:
app.run()
9.4 执行顺序
before_request 从上往下执行,
after_request 重下往上执行.
from flask import Flask
app = Flask(__name__)
# 参数设置
app.debug = True
;app.before_request
def process_request1(*args, **kwargs):
print(1)
;app.before_request
def process_request2(*args, **kwargs):
print(2)
;app.after_request # 需要接收response对象
def process_response1(response):
print(5)
return response
;app.after_request # 需要接收response对象
def process_response2(response):
print(4)
return response
# 登入
;app.route(;/;)
def index():
print(3)
return ;登入成功!;
if __name__ == ;__main__;:
app.run()
终端显示:
1
2
3
4
5
9.5 第一次请求触发
第一次请求时, 跟浏览器无关.
这个是django中没有的. 仅再项目启动时, 接收到第一次请求时触发, 与浏览器和用户无关.
如果写多个, 会将函数名存到一个列表中依次执行.
使用场景: 项目初始化.
;app.beffore_first_request
def first():
pass
9.6 所有请求触发
teardown_request注册一个在每个请求结束时要调用的函数, 无论该函数成功还是引发异常.
使用场景: 记录错误日志, 清理请求作用域对象(如数据库会话/事务)
from flask import Flask
app = Flask(__name__)
# 参数设置
app.debug = True
# 请求走时触发
;app.after_request
def process_request(response):
print(;执行了process_request;)
return response
# 所有请求结束时触发
;app.teardown_request
def teardown(e):
print(;执行了teardown_request;)
# 主页面
;app.route(;/;)
def index():
print(;index;)
return ;index主页面;
# order页面
;app.route(;/order;)
def order():
print(;order1;)
# 设置一个错误
a
print(;order2;)
return ;order页面;
if __name__ == ;__main__;:
app.run()
测试:
访问根目录, 先执行 index函数, 再执行process_request函数, 最后执行teardown_request函数.
再次访问;/order;, 先执行index函数, 打印;order1;, 报错..., 报错前执行teardown_request函数.
9.7 异常时触发
出现异常的时候触发, 可以根据状态码触发对应的处理函数.
;app.errorhandler(400)
def error_400():
pass
写一个函数, 处理所有的404响应状态码.
* 1. templates 下新建error.html页面
<!-- error.html页面 -->
<!DOCTYPE html>
<html lang=;en;>
<head>
<meta charset=;UTF-8;>
<title>Title</title>
</head>
<body>
{{ error_msg }}
</body>
</html>
* 2. 404状态码处理函数
from flask import Flask, render_template
app = Flask(__name__)
# 参数设置, debug模式不要开始否则不会触发errorhandler(500)
app.debug = False
;app.errorhandler(404)
def error_404(e): # 需要接收错误信息
# 404 Not Found: The requested URL was not found on the server. ...
print(e)
return render_template(;error.html;, error_msg=;404错误页面;)
;app.errorhandler(500)
def error_500(e): # 需要接收错误信息
# NameError: name ;a; is not defined
print(e)
return render_template(;error.html;, error_msg=;500错误页面;)
;app.route(;/;)
def index():
# 触发服务器500错误
a
return ;index;
if __name__ == ;__main__;:
app.run()
* 3. 测试:
访问所有不存在的路由, 会返回404, 返回时, 触发errorhandler(404).
访问根路径, 会返回500, 返回时, 触发errorhandler(500).
9.8 全局标签
;app.template_global() 全局标签再后端定义, 前端模板页面任何位置中可以调用.
from flask import Flask, render_template
app = Flask(__name__)
# 定义全局标签
;app.template_global()
def num_add(x, y):
return x ; y
;app.route(;/;)
def index():
return render_template(;index.html;)
if __name__ == ;__main__;:
app.run()
<!DOCTYPE html>
<html lang=;en;>
<head>
<meta charset=;UTF-8;>
<title>Title</title>
</head>
<body>
<!-- 再模块页面中使用 -->
{{ num_add(1, 2) }}
</body>
</html>
9.9 全局过滤器
;app.template_filter 全局标签再后端定义, 前端模板页面任何位置中可以调用.
* 注意传递参数的方式.
from flask import Flask, render_template
app = Flask(__name__)
;app.template_filter()
def num_add(x, y, z):
return x ; y ; z
;app.route(;/;)
def index():
return render_template(;index.html;)
if __name__ == ;__main__;:
app.run()
<!DOCTYPE html>
<html lang=;en;>
<head>
<meta charset=;UTF-8;>
<title>Title</title>
</head>
<body>
<!--第一个参数再 |函数名 前面-->
{{ 1|num_add(2, 3) }}
</body>
</html>