python第三十八天,(线程回调,线程中的队列,事件,greentlet模块,gevent模块,自定义补丁) 单线程实现并发,协程

1.线程回调

在线程池/进程池
每次提交任务,都会返回一个表示任务的对象,Future对象
Future对象具备一个绑定方法,add_done_callback 用于指定回调函数
  add 意味着可以添加多个回调函数
如果直接使用Thread的话,如何完成回调

python第三十八天,(线程回调,线程中的队列,事件,greentlet模块,gevent模块,自定义补丁) 单线程实现并发,协程
from threading import Thread
import time


def call_back(res):
    print('任务结果拿到了:%s' % res)


def parser(res):
    print('任务结果拿到了:%s' % res)


def task(parser):
    print('run')
    time.sleep(1)
    res = 100  # 表示任务结果
    parser(res)  # 执行回调函数,并传入任务结果


t = Thread(target=task, args=(call_back,)) # 在这里指定parser也可以
t.start()

print('over')
View Code

 

2.线程中的队列

python第三十八天,(线程回调,线程中的队列,事件,greentlet模块,gevent模块,自定义补丁) 单线程实现并发,协程
from queue import Queue,LifoQueue,PriorityQueue

# 与进程中的Joinablequeue使用方法一模一样,但是不具备IPC

# Queue 队列
# q = Queue()
#
# # 可以往里面添加数据
# q.put('123')
# q.put('456')
#
# # 可以获取数据
# print(q.get())
# print(q.get())
#
# # 获取值,设定取值时间,超时报错
# # print(q.get(block=True,timeout=3))
#
# # 告诉队列取值已经处理完毕
# q.task_done()
# q.task_done()
# # 等待队列为空后结束队列
# q.join()
# 输出结果
# 123
# 456



# LifoQueue,翻译为last in first out 后进先出,先进后出,模拟堆栈的模式

# 除了与Queue的队列不一样之外,其他的都一样
# lq = LifoQueue()
#
# lq.put('123')
# lq.put('456')
#
# print(lq.get())
# print(lq.get())
# 输出结果
# 456
# 123


# 具备优先级的队列
# PriorityQueue
# 可以存储一个可以比较大小的对象,对象越小,优先级越高,自定义对象不能使用比较运算符,所以不能存储


# q = PriorityQueue()
# 
# q.put('b')
# q.put('a')
# 
# print(q.get())  # 会优先得到a
View Code

 

3.事件 Event()

了解Event之前,我们先了解一个案例:

python第三十八天,(线程回调,线程中的队列,事件,greentlet模块,gevent模块,自定义补丁) 单线程实现并发,协程
from threading import Thread, Event

import time

#
# is_running = False

# def boot_server():
#     global is_running
#     print('正在启动服务器。。。')
#     time.sleep(3)
#     print('服务器启动成功!')
#     is_running = True
#
#
# def connect_server():
#     while True:
#         if is_running:
#             print('连接服务器成功!')
#             break
#         else:
#             time.sleep(0.5)
#             print('error,服务器未启动')
#
#
# t1 = Thread(target=boot_server)
# t1.start()  
#
# t2 = Thread(target=connect_server)
# t2.start()
View Code

 

案例:比如说我们做一个客户端连接服务器的模型,服务器与客户端同时启动 而客户端需要访问服务器,

而服务器启动需要时间,但是两个端口却是同时开启 所以客户端不能及时访问到服务器。

 

Event方法

python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法wait、clear、set

事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么执行event.wait 方法时便不再阻塞。

  • clear:将“Flag”设置为False

  • set:将“Flag”设置为True

用 threading.Event 实现线程间通信,使用threading.Event可以使一个线程等待其他线程的通知,我们把这个Event传递到线程对象中,

Event默认内置了一个标志,初始值为False。一旦该线程通过wait()方法进入等待状态,直到另一个线程调用该Event的set()方法将内置标志设置为True时,该Event会通知所有等待状态的线程恢复运行。

使用Event的案例:

python第三十八天,(线程回调,线程中的队列,事件,greentlet模块,gevent模块,自定义补丁) 单线程实现并发,协程
from threading import Thread, Event

import time
# 而使用Event方法就可以解决这种问题
boot_event = Event()


# boot_event.clear()    恢复事件的状态为False
# boot_event.is_set()   返回事件的状态
# boot_event.wait()     等待事件发生,就是等待事件被设置为True
# boot_event.set()      设置事件为True


def boot_server():
    print('正在启动服务器。。。')
    time.sleep(3)
    print('服务器启动成功')
    boot_event.set()  # 标记事件已经发生了,将状态设置为True


def connect_server():
    boot_event.wait()  # 等待事件发生,如果状态成为了True,会执行下面的代码
    print('连接服务器成功!')


t1 = Thread(target=boot_server)
t1.start()

t2 = Thread(target=connect_server)
t2.start()
View Code

 

引子

上一节中我们知道GIL锁将导致CPython无法利用多核CPU的优势,只能使用单核并发的执行。很明显效率不高,那有什么办法能够提高效率呢?

效率要高只有一个方法就是让这个当前线程尽可能多的占用CPU时间,如何做到?

任务类型可以分为两种 IO密集型 和 计算密集型

对于计算密集型任务而言 ,无需任何操作就能一直占用CPU直到超时为止,没有任何办法能够提高计算密集任务的效率,除非把GIL锁拿掉,让多核CPU并行执行。

对于IO密集型任务任务,一旦线程遇到了IO操作CPU就会立马切换到其他线程,而至于切换到哪个线程,应用程序是无法控制的,这样就导致了效率降低。

如何能提升效率呢?想一想如果可以监测到线程的IO操作时,应用程序自发的切换到其他的计算任务,是不是就可以留住CPU?的确如此

一、单线程实现并发

单线程实现并发这句话乍一听好像在瞎说

首先需要明确并发的定义

并发:指的是多个任务同时发生,看起来好像是同时都在进行

并行:指的是多个任务真正的同时进行

早期的计算机只有一个CPU,既然CPU可以切换线程来实现并发,那么为何不能再线程中切换任务来并发呢?

上面的引子中提到,如果一个线程能够检测IO操作并且将其设置为非阻塞,并自动切换到其他任务就可以提高CPU的利用率,指的就是在单线程下实现并发。

如何能够实现并发呢

并发 = 切换任务+保存状态,只要找到一种方案,能够在两个任务之间切换执行并且保存状态,那就可以实现单线程并发

python中的生成器就具备这样一个特点,每次调用next都会回到生成器函数中执行代码,这意味着任务之间可以切换,并且是基于上一次运行的结果,这意味着生成器会自动保存执行状态!

 

案例:yiled实现并发效果

python第三十八天,(线程回调,线程中的队列,事件,greentlet模块,gevent模块,自定义补丁) 单线程实现并发,协程
# 使用生成器来实现单线并发多个任务

import time


def func1():
    a = 1
    for i in range(1000000):
        a += 1
        print('a run')
        time.sleep(3)
        yield


def func2():
    res = func1()
    a = 1
    for i in range(1000000):
        a += 1
        print('b run')
        next(res)
st = time.time()
func2()
print(time.time() - st)

'''
对于纯计算的任务而言,单线程并发反而使执行效率下降了一半左右。所以这样的方案对于纯计算任务而言是没有必要的
我们暂且不考虑这样的并发对程序的好处是什么,在上述代码中。使用yield来切换代码结构非常混乱,如果任务太多,
而且每个都需要切换的话,那么会大大的增加时间。降低了效率,因此就有人专门对yield进行了封装,于是便有了greenlet模块
'''
View Code

 

2.greenlet模块

python第三十八天,(线程回调,线程中的队列,事件,greentlet模块,gevent模块,自定义补丁) 单线程实现并发,协程
'''
一个'greenlet’是一个小型的独立伪线程,可以把它想象成一些栈帧,栈是初始调用的函数,而栈顶是当前'greenlet'的暂停位置
你使用'greenlet' 创建一堆这样的堆栈,然后在他们之间跳转执行。跳转必须显式声明的:一个greenlet必须选择要跳转到的另一个greenlet,
这会让前一个挂起,而后一个在此前挂起处恢复执行。不同greenlets之间的跳转称为切换(switching) 。

  当你创建一个greenlet时,它得到一个开始时为空的栈;当你第一次切换到它时,它会执行指定的函数,这个函数可能会调用其他函数、切换跳出greenlet等等。
最终栈底的函数执行结束出栈时,这个greenlet的栈又变成空的,这个greenlet也就死掉了。greenlet也会因为一个未捕捉的异常死掉。
'''
import greenlet
import time


def task1():
    print('task1,run')
    g2.switch()  # 切换任务至g2
    print('task1 over')
    g2.switch()  # 切换任务至g2


def task2():
    print('task2,run')
    g1.switch()  # 切换任务至g1
    time.sleep(2)
    print('task2,over')


g1 = greenlet.greenlet(task1)
g2 = greenlet.greenlet(task2)


g1.switch()

print('主 over')

'''
该模块简化了yield复杂的代码结构,实现了单线程下多任务并发,但是无论直接使用yield还是greenlet都不能检测到IO操作
如果遇到IO时同样进入阻塞状态,所以此时的并发是没有任何意义的
因此就延申出了gevent模块,既能检测IO,又能实现单线程并发,注意的是gevent模块自身无法检测IO
'''
View Code

 

 

  协程·

协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。

需要强调的是:

1. python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会*交出cpu执行权限,切换其他线程运行)
2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)

对比操作系统控制线程的切换,用户在单线程内控制协程的切换

优点如下:

1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
2. 单线程内就可以实现并发的效果,最大限度地利用cpu

缺点如下:

1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程来尽可能提高效率
2. 协程本质是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程

 

1.geven模块 协程的使用

python第三十八天,(线程回调,线程中的队列,事件,greentlet模块,gevent模块,自定义补丁) 单线程实现并发,协程
import time

'''
gevent 不具备检测IO的能力,需要为它打补丁,打上补丁之后就能检测IO
注意补丁一定需要打在最上面,必须保证导入模块前就打好补丁
'''

from gevent import monkey

monkey.patch_all()

from threading import current_thread
import gevent


def task1():
    # print(current_thread(), 1)
    print('task1,run')

    time.sleep(3)
    print('task1 over')


def taks2():
    # print(current_thread(), 2)
    print('task2 run')
    print('task2 over')



# spawn 用于创建一个协程任务
g1 = gevent.spawn(task1)
g2 = gevent.spawn(taks2)


# 任务要执行,必须保证主线程没挂,因为所有协程任务都是主线在执行
# 必须调用join来等待协程任务,理论上等待执行时间最长的任务,
# 但是我们在执行过程中并不知道那个任务执行的时间最长,所有全部join
# 这里有一个方法可以全部join,
gevent.joinall([g1,g2])
print('over')
View Code

 

monkey () 补丁

在Python语言中,monkey patch 指的是对于一个类或者模块所进行的动态修改

 

2.自定义补丁练习

python第三十八天,(线程回调,线程中的队列,事件,greentlet模块,gevent模块,自定义补丁) 单线程实现并发,协程
import json
# 自定义json补丁

def dumps(obj):
    print('这是打完补丁后的dumps函数 哈哈哈哈!')


def loads(json_str):
    print('这是打完补丁后的loads函数 嘻嘻嘻嘻!')


def patch_json():
    json.dumps = dumps
    json.loads = loads

# 打补丁
patch_json()

# 再次调用会执行覆盖的dumps与loads方法
json.dumps('123123')  # 输出结果  '这是打完补丁后的dumps函数 哈哈哈哈!'
json.loads('123321')  # 输出结果  '这是打完补丁后的loads函数 嘻嘻嘻嘻!’
View Code

 

上一篇:163 python网络编程 - 协程(greenlet版)


下一篇:python网络-多任务实现之协程