24-3-Python多线程-线程队列-queue模块
3-1-概念
queue模块提供了多线程编程中的队列实现,队列是线程安全的数据结构,能在多线程环境下安全地进行数据交换。
3-2-queue 的队列类型
- Queue(先进先出队列)、
- LifoQueue(后进先出队列,类似栈)、
- PriorityQueue(优先队列)
3-3-Queue(先进先出队列)
3-3-1-常用方法
编号 | 方法名 | 方法说明 |
1 | __init__(maxsize=0) | 创建队列实例, `maxsize` 是队列的最大容量, 若为 0 则表示无限制 |
2 | put(item, block=True, timeout=None) | 将 `item` 放入队列。 若 `block` 为 `True` 且队列已满,会阻塞直到有空间; 若 `timeout` 有值,则最多阻塞 `timeout` 秒,超时会抛出 `Full` 异常。 若 `block` 为 `False`,队列满时会立即抛出 `Full` 异常 |
3 | get(block=True, timeout=None) | 从队列中取出并返回一个元素。 若 `block` 为 `True` 且队列为空,会阻塞直到有元素; 若 `timeout` 有值,则最多阻塞 `timeout` 秒,超时会抛出 `Empty` 异常。 若 `block` 为 `False`, 队列为空时会立即抛出 `Empty` 异常 |
4 | qsize() | 返回队列中元素的大致数量 |
5 | empty() | 判断队列是否为空,若为空返回 `True`,否则返回 `False` |
6 | full() | 判断队列是否已满,若已满返回 `True`,否则返回 `False` |
7 | Queue.task_done() | 在完成一项工作之后,函数Queue.task_done()向任务已经完成的队列发送一个信号。 这意味着之前入队的一个任务已经完成。 由队列的消费者线程调用。每一个get()调用得到一个任务时,接下来的task_done()调用告诉队列该任务已经处理完毕。 如果当前一个join() 正在阻塞,它将在队列中的所有任务都处理完时恢复执行(即每一个由put()调用入队的任务都有一个对应的task_done()调用)。 |
8 | Queue.join() | 实际上,这意味着等到队列为空时,再执行别的操作。 它会阻塞调用线程,直到队列中的所有任务处理完。 只要有数据加入队列,未完成的任务数就会增加。 当消费者线程调用task_done()时(意味着有消费者取得任务并完成任务),未完成的任务数就会减少。 当未完成的任务数降到0时,join()解除阻塞 |
3-3-2-示例代码
代码
import queue
import threading
import time
# 创建一个队列
q = queue.Queue(maxsize=3)
# 生产者线程函数
def producer():
for i in range(5):
q.put(i)
print(f"生产者放入元素 {i}")
time.sleep(1)
# 消费者线程函数
def consumer():
while True:
item = q.get()
print(f"消费者取出元素 {item}")
q.task_done() # 标记任务完成
# 创建生产者和消费者线程
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
# 启动线程
producer_thread.start()
consumer_thread.start()
# 等待生产者线程结束
producer_thread.join()
# 等待队列中所有任务完成
q.join()
# 结束消费者线程(这里只是示例,实际中可能需要更优雅的方式)
consumer_thread.join(timeout=1)
输出结果
3-4-LifoQueue(后进先出队列)
LifoQueue的方法和 Queue 基本相同,不过它遵循后进先出的原则。
3-4-1-示例代码
代码
import queue
# 创建一个后进先出队列
lifo_q = queue.LifoQueue()
# 放入元素
lifo_q.put(1)
lifo_q.put(2)
lifo_q.put(3)
# 取出元素
while not lifo_q.empty():
print(lifo_q.get())
输出结果
3 2 1
3-5-PriorityQueue(优先队列)
PriorityQueue中的元素会按照优先级排序,优先级低的元素先被取出。
元素一般是元组 (priority, item) 的形式,priority是优先级。
3-5-1-示例代码
代码
import queue
# 创建一个优先队列
priority_q = queue.PriorityQueue()
# 放入元素,元组第一个元素是优先级
priority_q.put((3, '任务3'))
priority_q.put((1, '任务1'))
priority_q.put((2, '任务2'))
# 取出元素
while not priority_q.empty():
print(priority_q.get())
这些队列类在多线程编程中非常实用,能帮助你实现线程间的安全数据交换和任务调度。
输出结果
3-6-数据结构-FIFO队列
3-6-1-概念
FIFO(先进先出)队列是一种常见的数据结构,遵循先进入队列的元素先出队的原则。
Python 的标准库 queue 模块提供了 Queue 类来实现 FIFO 队列,同时 collections 模块里的 deque 也能当作 FIFO 队列使用
3-6-2-应用场景
FIFO 队列在很多场景下都有应用,例如:
任务调度:在多线程或多进程编程中,可以使用 FIFO 队列来存储待执行的任务,确保任务按照提交的顺序依次执行。
消息传递:在分布式系统或多线程应用中,FIFO 队列可以用于线程间或进程间的消息传递,保证消息的顺序性。
3-6-3-常用方法
3-6-3-1-使用 queue.Queue
queue.Queue 是线程安全的,适合在多线程编程中使用
代码
import queue
# 创建一个 FIFO 队列
q = queue.Queue()
# 往队列中添加元素
q.put(1)
q.put(2)
q.put(3)
# 从队列中取出元素
while not q.empty():
item = q.get()
print(item)
输出结果
代码分析
put() 方法用于向队列添加元素,get() 方法用于从队列中取出元素,empty() 方法用于检查队列是否为空。
3-6-3-2-使用 collections.deque
collections.deque 是一个双端队列,也可以当作 FIFO 队列使用。它的操作效率较高,并且支持多线程环境。
代码
from collections import deque
# 创建一个 FIFO 队列
q = deque()
# 往队列中添加元素
q.append(1)
q.append(2)
q.append(3)
# 从队列中取出元素
while q:
item = q.popleft()
print(item)
print('------------')
q.append(1)
q.append(2)
q.append(3)
while q:
item = q.pop()
print(item)
输出结果
3-6-4-案例
代码
import queue
import threading
# 创建一个 FIFO 队列
q = queue.Queue()
# 生产者线程函数
def producer():
for i in range(5):
q.put(i)
print(f"Produced {i}")
# 消费者线程函数
def consumer():
while True:
item = q.get()
if item is None:
break
print(f"Consumed {item}")
q.task_done()
# 创建生产者和消费者线程
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
# 启动线程
producer_thread.start()
consumer_thread.start()
# 等待生产者线程完成
producer_thread.join()
# 发送结束信号给消费者线程
q.put(None)
# 等待消费者线程完成
consumer_thread.join()
输出结果
代码分析
生产者线程往队列中添加元素,消费者线程从队列中取出元素进行处理。
通过 task_done() 方法和 join() 方法可以确保所有任务都被处理完毕