2021/6/5 并发编程-线程2

'''
同一进程下多线程共享全局变量
守护线程:
随父线程的结束而结束,无法继续创建子线程
Thread.daemon/setDaemon(True) = True

互斥锁:
对公共资源进行加锁处理,防止出现数据安全问题
threading.Lock()
mutex.acquire()
mutex.release()

GIL全局解释器锁
对于解释器的数据安全加的一把特殊锁
1. GIL不是Python的特点而是CPython解释器的特点
2. 同一进程下的多线程只有抢到了GIL锁才能获得执行权限,如果发现访问资源未释放Lock锁,则立刻释放GIL锁
3. 对于解释型语言的通病:同进程下的多线程无法利用多核优势

多进程:适用于计算密集型,效率高于多线程
多线程:适用于IO密集型,效率高于多进程,且开销更小

死锁现象:
线程A获取到Lock锁B,继续执行需要获取到Lock锁A
线程B获取到Lock锁A,继续执行需要获取到Lock锁B
从而引发死锁现象,双方卡死

递归锁:
递归锁内部存在一个counter计数器,每当调用一次acquire,则计数器+1
每当调用一次release,则计数器-1
只有当计数器为0时,RLock锁才能被抢
threading.RLock
.acquire
.release

Semaphore信号量,在并发编程中是代指锁的概念
Semaphore可以实现创建多个锁,每个线程都可以抢到Semaphore限定数量下的锁
Semaphore内存在一个counter计数器,当调用一次release,计数器+1
当调用一次acquire,计数器-1,当计数器为0时,无法抢锁
from threading import Semaphore
s = Semaphore([value])
s.acquire()
s.release()

Event()
线程同步
from threading import Event
event = Event()
.set() True
.clear() False
.wait() True则激活线程,False则阻塞线程
.isSet()/is_set() 获取event状态值


线程队列queue模块
队列 = 管道加锁
Queue类:先入先出
LifoQueue:先入后出
PriorityQueue((int, obj)):优先级队列
.put(obj, block=True, timeout=None)
.put_nowait()
.get(block=True, timeout=None)
.get_nowait()
.qsize()
.empty()
.full()
.join()
.task_done()

进程池
from multiprocessing import Pool
p = Pool(processes=os.cpu_count())
p.apply_async(func, args=(), kwargs={})
异步提交,创建进程任务并执行,返回ApplyResult对象
.get([timeout]) 原地阻塞,获取进程任务的返回结果
.ready() 进程任务执行完成,返回True
.successful() 进程任务正常执行完成,返回True,如果在进程任务执行完成之前调用该方法,则会报错
.wait([timeout]) 等待结果变为可用
p.apply() --> p.apply_async().get()
p.close() 关闭进程池
p.join([timeout]) 原地阻塞,等待进程任务执行完毕
p.terminate() 终结进程池中所有任务的运行

进程池
from multiprocess import Pool
p = Pool(processes=os.cpu_count())
p.apply_async(func, args=(), kwargs={})
异步提交进程任务,并返回ApplyResult对象
.get([timeout]) 原地阻塞,等待进程任务的返回结果
.ready() 进程执行完毕返回True
.successful() 如果进程正常执行完毕,返回True,如果该方法在进程执行完毕之前调用,则会抛出异常
.wait([timeout]) 等待结果变为可用
p.apply(func, args=(), kwargs={})
同步提交进程任务,只有上一个进程执行完毕,才会开始下一个进程任务的执行
源码大致:p.apply_async().get()
p.close() 关闭进程池
p.join([timeout]) 等待所有子进程执行完毕,必须写在close、terminate的后面
p.terminate() 终结进程池中的所有进程任务

如果使用队列,则需要导入
from multiporcessing.Manager import Queue


进程池
from concurrent.futures import ThreadPoolExecutor
p = ThreadPoolExecutor(max_workers=os.cpu_count())
p.submit(func, *args, **kwargs)
异步提交进程任务,返回future对象
.cancel() 取消当前进程任务,如果正在运行,则无法取消返回Fasle,否则True
.cacelled() 返回进程任务是否取消成功
.running() 任务正在执行返回True
.done() 任务执行完毕或取消执行,则返回True
.result([timeout]) 阻塞原地,等待任务的返回结果
.exception([timeout]) 原地阻塞,获取当前任务引发的异常,如果没有则返回None
.add_done_callback(func) 回调函数,当前线程执行完毕后将future作为参数传递给回调函数
p.shutdown()
关闭进程池,并原地等待所有进程任务执行完毕(close, join)
p.map(func, iterable, timeout=None, chunksize=1)


from concurrent.futures import ThreadPoolExecutor
p = ThreadPoolExecutor(os.cpu_count())
p.submit(func, *args, **kwargs)
返回future类
.running()
.done()
.cancel() 终结当前进程任务,如果任务正在运行则无法终结,返沪False,否则返回True
.cancelled() 如果当前任务被成功终结,返回True,否则返回False
.result([timeout]) 阻塞原地,等待获取进程任务的返回结果
.exception([timeout]) 阻塞原地,等待进程任务的异常信息,如果任务正常执行完毕,则返回None
.add_done_callback(fn) 回调函数,任务执行完毕自动触发,将future作为参数传递给回调函数

p.shutdown() --> close join
p.map(func, iterable, timeout=None, chunksize=1)


from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
p = ThreadPoolExecutor(int=os.cpu_count())
p.submit(func)
future
.running
.done
.cancel() True False
.cancelled() True, False
.exception([])
.result([])
.add_done_callback(fun)
p.shutdown([timeout])
p.map()

协程
核心思想:单线程下实现高并发
1. 监控IO操作 gevent spawn
2. 切换CPU资源 greenlet switch
3. 保存状态 yield

from gevent import spawn
from gevent import monkey;monkey.patch_all()
g = spawn(func, *args)
Greenlet对象.join()

from greenlet import greenlet, switch
g = greenlet(func)
g.switch()

IO模型:
多路复用IO
优势在于处理多个链接
异步IO
使用回调机制,避免等待数据和拷贝数据
阻塞IO
在IO执行的两个阶段,等待数据和拷贝数据,都会被阻塞
非阻塞IO
在拷贝数据阶段被阻塞
信号驱动IO

'''
上一篇:jenkins学习10-参数化构建(构建git仓库分支)


下一篇:亲测!ROS安装过程中rosdep init && rosdep update timeout的解决办法