flask 4 flask-session、dbutils数据库连接池,wtforms基础操作

Flask day4 flask-session、dbutils数据库连接池,wtforms基础操作

内容回顾

1.jdango和flask框架的认识?
	-对于请求的处理方式不一样:django是直接以参数的形式传入,flask则是上下文管理的机制
2.flask中上下文管理机制?
	-当一个请求过来之后
	 先通过wsgi处理
	 然后会执行app.__call__方法
	 该方法又会执行wsgi_app方法
	 会创建一个ctx而ctx又是一个RequestContext的对象里面含有request和session
 	 之后会通过localstack将ctx放入local中以{线程id号:{'stactk':[ctx]}形式存储
  	 取值时是基于localproxy类再调用一个函数让localstack从local中取出ctx中的request或者session
3.为什么把请求放到RequestContext中?
	-ctx = RequestContext()——> request, session
	对于请求数据request和session会经常用放一起便于导入
4.Local对象的作用?
	-看过Local源码,和threading.local相似,但是又有不同
	-Local中可以基于协程greenlet获取唯一标识,粒度更细。
5.LocalStack对象的作用?
	-对Local对象中的数据进行操作。
	-将Local对象中的数据维护成了一个栈(先进后出)
		-local = {
			1231:{stack:[ctx,ctx...]}
		}
6.上下文管理?
	-请求上下文:request/session
	-APP和g:app/g
7.什么是g?
	-一次请求周期内的全局变量在reques_before时定义,后面可以直接调用
8.获取session/g
	-LocalProxy
9.技术:
	-反射
	-面向对象,封装
		-__dict__
	-线程(threading.local)
	-笔试:自己写一个类+列表实现栈。(LocalStack实例)
10.实例化的类可被for循环
	-该类的__iter__方法返回一个可迭代对象

class Foo:
    def __iter__(self):
        # 返回迭代器
        # return iter([11, 22, 33, 'xx'])
        # 返回生成器(特殊的迭代器)
        yield 11
        yield 22
        yield 33
        yield 'xx'

obj = Foo()
for i in obj:
    print(i)
    
>>>>
11
22
33
xx

今日内容

1.flask-session

将session改为redis session

from flask import Flask, request, session
from flask_session import RedisSessionInterface
from redis import Redis

# 默认方式将session存入cookie返回到浏览器保存
# from flask.sessions import SecureCookieSessionInterface
# app.session_interface = SecureCookieSessionInterface()

# 方式一:redis保存session可以数据保存在后台
app.session_interface = RedisSessionInterface(
    redis=Redis(host='localhost', port=6379),
    key_prefix='flask_xx'  # session中随即字符串的 flask_xx+uuid4
)
# 方式二:redis保存session
from flask_session import Session
from redis import Redis
app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_REDIS'] = Redis(host='localhost', port=6379)
Session(app)

修改session内层数据的两种方式

home.py

from flask import Blueprint, session

home = Blueprint('home', __name__)


@home.route('/index')
def index():

    # session['user_info'] = {'k1': 1, 'k2': 2}  # 会调用__setitem__方法里面会执行on_update将modified改为True

    user_info = session.get('user_info')
    print("原来的值", user_info)

    session['user_info']['k1'] = 777  # 只执行了__getitem__方法此时modified还是为False,所有不会修改cookie中的session

    user_info = session.get('user_info')
    print("修改后的值", user_info)

    # 方式一:手动将modified改为True
    session['modified'] = True
    # 方式二:将SESSION_REFRESH_EACH_REQUEST在配置中设置为True(这个还能刷新session的超时时间)、同时还要在登录成功之后还要让session.permanent = True

    return 'index'


@home.route('/test')
def test():
    user_info = session.get('user_info')
    print(user_info)
    return 'test'

account.py

from flask import Blueprint, redirect, render_template,request,session
from uuid import uuid4

account = Blueprint('account', __name__)


@account.route('/login', methods=['GET', 'POST'])
def login():
    error=''
    if request.method == 'POST':
        user = request.form.get('user')
        pwd = request.form.get('pwd')
        if user=='lem' and pwd == '123':
            uid = uuid4()
            session.permanent = True
            session['user_info'] = {'id':uid, 'name':user}
            return redirect('/index')
        else:
            error = '用户名或者密码错误'
    return render_template('login.html', error=error)

settings.py

from datetime import timedelta


class Config:
    DEBUG = True
    SECRET_KEY = 'lem'  # session加密的盐
    PERMANENT_SESSION_LIFETIME = timedelta(minutes=30)  # 超时时间30分钟失效
    SESSION_REFRESH_EACH_REQUEST = True  # 每次更新session重复保存以保证session超时时间改变


class DevelopmentConfig(Config):
    pass


class ProductionConfig(Config):
    pass


class TestingConfig(Config):
    pass

总结flask_session:

作用:将默认保存的签名cookie中的值保存到 redis/memcached

应用:
	-方式一:
        -配置
            -app.config['SESSION_TYPE'] = 'redis'
            -app.config['SESSION_REDIS'] = Redis(host='localhost', port=6379)
        -替换
            -from flask_session import Session
            -Session(app)
     -方式二:
     	-from flask_session import RedisSessionInterface
     	-app.session_interface = RedisSessionInterface(redis=Redis(host='localhost',port=6379), key_prefix='flask_xx'  )
	
	注意:session中存储的是字典,修改字典内部元素时,会造成数据不更新。
		-1.modified=True
		-2.SESSION_REFRESH_EACH_REQUEST = True and session.permanent = True

2.数据库连接池:DBUtils(pymysql)

了解DBUtils

模式:
	-每个线程创建一个链接,关闭(默认不关闭),线程终止时,才关闭链接
	-创建共享连接池
应用:
	-只要写原生SQL,就要用数据库链接池
import pymysql
from dbutils.pooled_db import PooledDB

POOL = PooledDB(
    creator=pymysql,  # 使用连接数据库的模块
    maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=2,  # 初始化时,连接池中至少创建的空闲的连接,0表示不创建
    maxcached=5,  # 连接池中最多闲置的连接,0和None不限制
    # 连接池中最多共享的连接数量,0和None表示全部共享.PS:无用,因为pymysql和MYSQLdb等模块的threadsafety = 1,而threadsafety=1时_maxshared默认为0,所有永远都是所有连接都共享
    maxshared=3,
    blocking=True,  # 连接池中如果没有可用的连接后,是否阻塞等待。True,等待;False,不等待然后报错
    maxusage=None,  # 一个连接池最多被重复使用的次数,None表示无限制
    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to...", "set time zone..."]
    # ping Mysql服务端,检查是否服务可用。如:0=None=never,1=default=whenever it is requested,2=when a cursor is created, 4=when a query is executed,7=always
    ping=0,  # 工作中一般都是0/4/7

    host='localhost',
    port=3306,
    user='root',
    password='123456',
    database='flask',
    charset='utf8'
)


def func():
    """
    检测当前正在运行连接数是否小于最大连接数,如果不小于则:等待或者抛出raise TooManyConnections异常
    否则优先去初始化时创建的连接中获取连接,SteadyDBConnection
    然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回
    如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回
    一旦关闭链接后,连接就返回到连接池让后续线程继续使用
    :return:
    """
    conn = POOL.connection()

    # print(th, "连接被拿走了", conn1._con)
    # print(th, "池子里目前有", pool.idle_cache,'\r\n')

    cursor = conn.cursor(pymysql.cursors.DictCursor)

    cursor.execute("select * from user")
    res = cursor.fetchall()
    conn.close()

    return res

用于flask中

配置数据库的参数
class Config:
    DEBUG = True
    SECRET_KEY = 'lem'  # session加密的盐
    PERMANENT_SESSION_LIFETIME = timedelta(minutes=30)  # 超时时间30分钟失效
    SESSION_REFRESH_EACH_REQUEST = True  # 每次更新session重复保存以保证session超时时间改变
    SESSION_TYPE = 'redis'


class DevelopmentConfig(Config):
    ENV = 'development'
    SESSION_REDIS = Redis(host='192.168.11.213', port=6379)
    SESSION_KEY_PREFIX='lem'
    PYMYSQL_HOST = "localhost"
    PYMYSQL_PORT = 3306
    PYMYSQL_USER = 'root'
    PYMYSQL_PASSWORD = '123456'
    PYMYSQL_DATABASE = 'flask'
    PYMYSQL_CHARSET = 'utf8'
将数据里连接池放到app.config

类似于redis(注意app创建时间和连接池创建时调用app的配置信息的顺序)

pool.py

import pymysql
from dbutils.pooled_db import PooledDB


def init_pool(app):
    POOL = PooledDB(
        creator=pymysql,  # 使用连接数据库的模块
        maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
        mincached=2,  # 初始化时,连接池中至少创建的空闲的连接,0表示不创建
        maxcached=5,  # 连接池中最多闲置的连接,0和None不限制
        # 连接池中最多共享的连接数量,0和None表示全部共享.PS:无用,因为pymysql和MYSQLdb等模块的threadsafety = 1,而threadsafety=1时_maxshared默认为0,所有永远都是所有连接都共享
        maxshared=3,
        blocking=True,  # 连接池中如果没有可用的连接后,是否阻塞等待。True,等待;False,不等待然后报错
        maxusage=None,  # 一个连接池最多被重复使用的次数,None表示无限制
        setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to...", "set time zone..."]
        # ping Mysql服务端,检查是否服务可用。如:0=None=never,1=default=whenever it is requested,2=when a cursor is created, 4=when a query is executed,7=always
        ping=0,  # 工作中一般都是0/4/7
	
        # 读取app的配置文件中的参数
        host=app.config['PYMYSQL_HOST'],
        port=app.config['PYMYSQL_PORT'],
        user=app.config['PYMYSQL_USER'],
        password=app.config['PYMYSQL_PASSWORD'],
        database=app.config['PYMYSQL_DATABASE'],
        charset=app.config['PYMYSQL_CHARSET']
    )
    # 给app的上面的POOL添加到app的config中
    app.config['PYMYSQL_POOL'] = POOL

__init__.py

from flask import Flask
from .views import account, home
import settings
from flask_session import Session
from apps.utils.mysql_pool import init_pool


def create_user():
    app = Flask(__name__)

    app.config.from_object(settings.DevelopmentConfig)

    app.register_blueprint(account.account)
    app.register_blueprint(home.home)

    # 将session替换成redis session
    # Session(app)

    init_pool(app)  # 将app以参数的形式传到上面的init_pool中,该方法与上面Session(app)类似

    return app

helper.py

import pymysql
from flask import current_app

# 创建一个操作sql的类,让需要执行sql的地方导入即可
class MysqlHelper:

    @staticmethod
    def open():
        POOL = current_app.config['PYMYSQL_POOL']
        conn = POOL.connection()
        cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
        return conn, cursor

    @staticmethod
    def close(conn, cursor):
        conn.commit()
        cursor.close()
        conn.close()

    @classmethod
    def fetch_one(cls, sql, args):
        conn, cursor = cls.open()
        cursor.execute(sql, args)
        obj = cursor.fetchone()
        cls.close(conn, cursor)
        return obj

    @classmethod
    def fetch_all(cls, sql, args):
        conn, cursor = cls.open()
        cursor.execute(sql, args)
        obj = cursor.fetchall()
        cls.close(conn, cursor)
        return obj

3.wtforms

作用:用于对python web框架做表单验证
使用:
	class MyForm(Form):
		user = 类(正则,插件)
		字段 = 类(正则,插件)
		字段 = 类(正则,插件)
		字段 = 类(正则,插件)
		字段 = 类(正则,插件)
		字段 = 类(正则,插件)
	obj = MyForm()
	# 生成HTML标签
	form.obj  # 会调用类.__str__方法==> 插件.xx方法
	# 验证
	obj = MyForm(formdata=request.form)
	if form.validate():
		# 内部找到所有的字段:user+用户发过来的数据=>正则校验

wtforms登录校验

from wtforms import Form
from wtforms.fields import simple, core, html5
from wtforms import validators
from wtforms import widgets

# 前端提交post请求时不做校验 form表单加novalidate
class LoginForm(Form):
    user = simple.StringField(
        label='用户名',
        validators=[
            validators.DataRequired(message='用户名不能为空.'),
            # validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
        ],
        widget=widgets.TextInput(),
        render_kw={'class': 'form-control'}

    )
    pwd = simple.PasswordField(
        label='密码',
        validators=[
            validators.DataRequired(message='密码不能为空.'),
            # validators.Length(min=8, message='用户名长度必须大于%(min)d'),
            # validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
            #                   message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')

        ],
        widget=widgets.PasswordInput(),
        render_kw={'class': 'form-control'}
    )

@account.route('/login', methods=['GET', 'POST'])
def login():
    form = LoginForm()
    if request.method == 'POST':
        form = LoginForm(formdata=request.form)
        if form.validate():
            print("用户提交数据通过校验,提交的数据为:", form.data)  # {'user': 'hina', 'pwd': '123'}
            obj = MysqlHelper.fetch_one(
                "select id, username from user where username=%(username)s and password=%(password)s", form.data)
            if obj:
                session.permanent = True
                session['user_info'] = {'id': obj['id'], 'name': obj['username']}
                return redirect('/index')

    return render_template('login.html', form=form)

wtforms注册校验

from wtforms import Form, StringField, PasswordField, RadioField, SelectField, SelectMultipleField
from wtforms.fields import simple, core, html5
from wtforms import validators
from wtforms import widgets
import email_validator

class RegisterForm(Form):
    name = simple.StringField(
        label='用户名',
        validators=[
            validators.DataRequired()
        ],
        widget=widgets.TextInput(),
        render_kw={'class': 'form-control'},
        default='hina'
    )

    pwd = simple.PasswordField(
        label='密码',
        validators=[
            validators.DataRequired(message='密码不能为空.')
        ],
        widget=widgets.PasswordInput(),
        render_kw={'class': 'form-control'}
    )

    pwd_confirm = simple.PasswordField(
        label='重复密码',
        validators=[
            validators.DataRequired(message='重复密码不能为空.'),
            validators.EqualTo('pwd', message="两次密码输入不一致")
        ],
        widget=widgets.PasswordInput(),
        render_kw={'class': 'form-control'}
    )

    email = html5.EmailField(
        label='邮箱',
        validators=[
            validators.DataRequired(message='邮箱不能为空.'),
            validators.Email(message='邮箱格式错误'),
        ],
        widget=widgets.TextInput(input_type='email'),
        render_kw={'class': 'form-control'}
    )

    gender = core.RadioField(
        label='性别',
        choices=(
            (1, '男'),
            (2, '女'),
        ),
        coerce=int
    )
    city = core.SelectField(
        label='城市',
        choices=MysqlHelper.fetch_all("select id, name from city", {}, None),
           #
           #     ('bj', '北京'),
           #     ('sh', '上海'),
           # )
           coerce = int
    )

    hobby = core.SelectMultipleField(
        label='爱好',
        choices=(
            (1, '篮球'),
            (2, '足球'),
        ),
        coerce=int
    )

    favor = core.SelectMultipleField(
        label='喜好',
        choices=(
            (1, '篮球'),
            (2, '足球'),
        ),
        widget=widgets.ListWidget(prefix_label=False),
        option_widget=widgets.CheckboxInput(),
        coerce=int,
        default=[1, 2]
    )

    def __init__(self, *args, **kwargs):
        super(RegisterForm, self).__init__(*args, **kwargs)
        self.city.choices = MysqlHelper.fetch_all("select id, name from city", {}, None)

    def validate_pwd_confirm(self, field):
        """
        自定义pwd_confirm字段规则,例:与pwd字段是否一致
        :param field:
        :return:
        """
        # 最开始初始化时,self.data中已经有所有的值

        if field.data != self.data['pwd']:
            # raise validators.ValidationError("密码不一致") # 继续后续验证
            raise validators.StopValidation("密码不一致")  # 不再继续后续验证

    def validate_name(self, field):
        print(field.data)  # 当前name传来的值
        print(self.data)  # 当前传来的所有值 name, gender ....

        obj = MysqlHelper.fetch_one("select id from user where username=%s", (field.data,))
        if obj:
            # raise validators.ValidationError("用户名已存在") # 继续后续验证
            raise validators.StopValidation("用户名已存在")  # 不再继续后续验证


@account.route('/register', methods=['GET', 'POST'])
def register():
    form = RegisterForm()
    if request.method == 'POST':
        form = RegisterForm(formdata=request.form)
        if form.validate():
            print(form.data)
        else:
            print(form.errors)

    return render_template('register.html', form=form)
# 由于在请求到来之前先要去数据库获取city的id和name所以又出现之前的获取不到current_app注册的配置PYMYSQL_POOL需要直接写在配置文件中导入
class DevelopmentConfig(Config):
    ENV = 'development'
    SESSION_REDIS = Redis(host='192.168.11.213', port=6379)
    SESSION_KEY_PREFIX = 'lem'
    PYMYSQL_HOST = "localhost"
    PYMYSQL_PORT = 3306
    PYMYSQL_USER = 'root'
    PYMYSQL_PASSWORD = '123456'
    PYMYSQL_DATABASE = 'flask'
    PYMYSQL_CHARSET = 'utf8'
    PYMYSQL_CONFIG = {"host": 'localhost', "user": 'root', "password": '123456', 'port': 3306, 'db': 'flask',
                      'charset': 'utf8'}
    PYMYSQL_POOL = PooledDB(
        creator=pymysql,  # 使用连接数据库的模块
        maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
        mincached=2,  # 初始化时,连接池中至少创建的空闲的连接,0表示不创建
        maxcached=5,  # 连接池中最多闲置的连接,0和None不限制
        # 连接池中最多共享的连接数量,0和None表示全部共享.PS:无用,因为pymysql和MYSQLdb等模块的threadsafety = 1,而threadsafety=1时_maxshared默认为0,所有永远都是所有连接都共享
        maxshared=0,
        blocking=True,  # 连接池中如果没有可用的连接后,是否阻塞等待。True,等待;False,不等待然后报错
        maxusage=None,  # 一个连接池最多被重复使用的次数,None表示无限制
        setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to...", "set time zone..."]
        # ping Mysql服务端,检查是否服务可用。如:0=None=never,1=default=whenever it is requested,2=when a cursor is created, 4=when a query is executed,7=always
        ping=0,  # 工作中一般都是0/4/7

        host="localhost",
        port=3306,
        user="root",
        password="123456",
        database="flask",
        charset="utf8",
    )

总结:

-上下文管理
-falsk-session
-wtforms
	-name = simple.StringField()
			UnboundField(StringField, 计数器)
	-FormMeta.__call__
上一篇:python – 将今天的日期作为默认值


下一篇:python-在调用DateField的验证程序之前更改field.data吗?