python threading queue模块中join setDaemon及task_done的使用方法及示例

threading:

    t.setDaemon(True)  将线程设置成守护线程,主进行结束后,此线程也会被强制结束。如果线程没有设置此值,则主线程执行完毕后还会等待此线程执行。

    t.join() 线程阻塞,只有当线程运行结束后才会继续执行后续语句

示例:

#coding: utf-8
import threading
import time def foo(name):
time.sleep(2)
print 'this is %s \n' % (name,) if __name__ == '__main__':
mythread = []
for i in range(5):
t = threading.Thread(target=foo, args=(i, ))
# t.setDaemon(True)
t.start()
mythread.append(t)
# for t in mythread:
# t.join()
print '-- end --'

运行结果(注意,print为非线程安全,所以打印内容有时会比较乱):

-- end --

this is 1

this is 2

this is 4

this is 0

this is 3

Process finished with exit code 0

可以看到,主线程和子线程是独立运行的(最后一行先被打印),主线程运行结束后依旧等待子线程结束。

把上面的t.setDaemon(True)取消注释,再运行一遍,发现只打印了如下内容:

-- end --

Process finished with exit code 0

即主线程运行结束后,会强制结束掉子线程

我们继续再把下面的注释去掉

# for t in mythread:

#     t.join()

再运行一遍,输出如下:

this is 1

this is 0

this is 3

this is 4

this is 2

-- end --

Process finished with exit code 0

注意:end在最后才被输出,说明在join那里阻塞了主线程的运行,在等待子线程运行完成。

queue:

       q.put(item) 将item放入队列中。

       q.get() 从队列中取出数据。

       q.task_done() 每次从queue中get一个数据之后,当处理好相关问题,最后调用该方法,以提示q.join()是否停止阻塞,让线程向前执行或者退出;

       q.join() 阻塞,直到queue中的数据已经每项已经task_done处理到空。

       如果不使用task_done也可以,可以通过q.full() q.empty()等来判断

 

 

q.join()隐藏的问题:

       对于生产者-消费者模型,这种阻塞方式是有漏洞的,因为如果queue初始为空,q.join()会直接停止阻塞,继续执行后续语句。

       还有另一种情况,就是生产者生产速度比较慢,而消费者消费速度比较快,也有可能停止阻塞,继续执行后续语句

       如果有多个消费者,没有生产者,且queue始初化为一定的数据量,则可以正常执行。

示例:

#coding: utf-8
import Queue
import threading
import time queue = Queue.Queue(maxsize=3) def produce():
for i in range(5):
item = "item" + str(i)
queue.put(item)
print "%s produce" % (item, )
# time.sleep(4) def customer():
while True:
time.sleep(2)
item = queue.get()
print "process %s finished" % (item, )
queue.task_done() if __name__ == '__main__':
t = threading.Thread(target=produce)
t.setDaemon(True)
t.start()
for i in range(3):
c = threading.Thread(target=customer)
c.setDaemon(True)
c.start()
queue.join()
print '-- end --'

运行输出如下:

item0 produce

item1 produce

item2 produce

process item0 finished

process item2 finishedprocess item1 finished

item3 produce

item4 produce

process item3 finished

process item4 finished

-- end --

Process finished with exit code 0

可以看到程序正常结束,end字符也在最后在打印出来。

我们把produce函数中注释掉如下语句,

queue.put(item)

print "%s produce" % (item, )

运行得到:

-- end --

Process finished with exit code 0

即queue为空,join()方法并不阻塞线程。

我们在produce函数中加上sleep,让生产慢一点,去掉time.sleep(4)前面的注释,运行得到:

item0 produce

process item0 finished

-- end --

Process finished with exit code 0

有时运行有可能会得到这样的错误信息:

Exception in thread Thread-3 (most likely raised during interpreter shutdown)

报错信息表明主线程不等待子线程就结束了。

可以发现,其实produce函数中应该还有任务要生成,但因为太慢,在join语句那里检测到队列为已经全部被设置task_done,就会继续往后执行,这有可能有时并不我们需要的,针对这样的情况,我们可以在queue.join()前加多一个子句t.join()即可达成目的。

如果我们再把customer函数中的queue.task_done()函数去掉,运行得到

item0 produce

process item0 finished

item1 produce

process item1 finished

item2 produce

process item2 finished

item3 produce

process item3 finished

item4 produce

process item4 finished

注意程序一直没有结束,而最后一行end语句也没有出现。因为在join那里,虽然队列已经全部为消费完,已经为0,但是由于它不是调用task_done函数而让其计数为0,所以此时,函数会一直阻塞在这里。

上一篇:读书笔记 - js高级程序设计 - 第十五章 使用Canvas绘图


下一篇:《Java并发编程实战》第十六章 Java内存模型 读书笔记