python线程安全:条件Condition(python 线程安全的数据类型)
条件Condition
可以把Condition理解为一把高级的琐,它提供了比Lock, RLock更高级的功能,允许我们能够控制复杂的线程同步问题。threadiong.Condition在内部维护一个琐对象(默认是RLock),可以在创建Condigtion对象的时候把琐对象作为参数传入。
Condition也提供了acquire, release方法,其含义与琐的acquire, release方法一致,其实它只是简单的调用内部琐对象的对应的方法而已。
Condition还提供了如下方法(特别要注意:这些方法
- 首先 (acquire)之后才能调用,
- 执行 wait/notify/notifyall
- 最后 release()否则将会报RuntimeError异常。):
- Condition.wait([timeout]):
- wait方法释放内部所占用的琐,同时线程被挂起,直至接收到通知被唤醒或超时(如果提供了timeout参数的话)。当线程被唤醒并重新占有琐的时候,程序才会继续执行下去。
- Condition.notify():
- 唤醒一个挂起的线程(如果存在挂起的线程)。注意:notify()方法不会释放所占用的琐。
- Condition.notify_all() Condition.notifyAll()
- 唤醒所有挂起的线程(如果存在挂起的线程)。注意:这些方法不会释放所占用的琐。
contiont的使用:实现生产者消费者
为什么要用生产者-消费者模型?
- 缓解生产者与消费者的速度差异:只能生产一个、再消费一个这样轮换的话,如果生产者和消费者速度差异很大,就会造成等待时间过长的问题。此时可以用一个缓冲区用来存储生产者生产的数据。
- 解耦:生产者消费者之间没有直接联系,代码不会相互影响
典型的场景:排除办事,窗口的工作人员从队列头取一个元素处理,任务结束后。队列商务部的人就离开,接着取下一个元素,然后队尾有多个人去皋陶,如果没有 控制,队列会混乱。
'''
Created on 2024年11月9日
@author: admin
'''
import random
import threading
import time
from concurrent.futures import ThreadPoolExecutor
customer_available_condition = threading.Condition()
customer_queue = []
def now():
return time.strftime("%H:%M:%S")
def serve_customers():
while True:
with customer_available_condition:
# Wait for a customer to arrive
while not customer_queue:
print(f"{now()}: Teller is waiting for a customer.")
customer_available_condition.wait()
# Serve the customer
customer = customer_queue.pop(0)
print(f"{now()}: Teller is serving {customer}.")
time.sleep(random.randint(1, 5))
print(f"{now()}: Teller has finished serving {customer}.")
def add_customer_to_queue(name):
with customer_available_condition:
print(f"{now()}: {name} has arrived at the bank.")
customer_queue.append(name)
customer_available_condition.notify()
customer_names = [
"Customer 1",
"Customer 2",
"Customer 3",
"Customer 4",
"Customer 5",
]
with ThreadPoolExecutor(max_workers=6) as executor:
teller_thread = executor.submit(serve_customers)
for name in customer_names:
time.sleep(random.randint(1, 3))
executor.submit(add_customer_to_queue, name)
这是一个生产都消费者程序
- 用户数据队列 customer_queue
- 有一个线程执行serve_customers从队列中取用户数据并处理
- 多个线程向队列中添加数据
队列消费者serve_customers
def serve_customers():
while True:
with customer_available_condition:
# with customer_available_condition:就实现了自动acquire/rrelease
while not customer_queue:
print(f"{now()}: Teller is waiting for a customer.")
customer_available_condition.wait()
# Serve the customer
customer = customer_queue.pop(0)
print(f"{now()}: Teller is serving {customer}.")
# Simulate the time taken to serve the customer
time.sleep(random.randint(1, 5))
print(f"{now()}: Teller has finished serving {customer}.")
- with customer_available_condition:就实现了自动acquire/rrelease
- while not customer_queue:当队列是空的时候时候customer_available_condition.wait(线程阻塞
- 如果队列有数据 customer = customer_queue.pop(0)取出孤独死第一个元素。
队列生产者add_customer_to_queue
def add_customer_to_queue(name):
with customer_available_condition:
print(f"{now()}: {name} has arrived at the bank.")
customer_queue.append(name)
customer_available_condition.notify()
- with customer_available_condition:先获取锁
- customer_queue.append(name)给队列中添加元素
- customer_available_condition.notify()通知消费者线程可以处理元素了
此时前面我们知道 serve_customers函数中的 customer_available_condition.wait()阻塞的地方会继续执行 while not customer_queue:判断因此生产者已经向队列中加入的了数据,所以这个while not customer_queue:会跳出,执行后面的消费队列 的元素逻辑。
def serve_customers():
while True:
with customer_available_condition:
while not customer_queue:
print(f"{now()}: Teller is waiting for a customer.")
customer_available_condition.wait()
这里解决一个Python知道点,如果列表a 为空那么not a的值是True
>>> a =[]
>>> not a
True
>>>
Conditon的另一种场景 Barrier
有多个任务要执行,需要等它们都执行成功后接着继续执行后面的逻辑。
import random
import threading
import time
from concurrent.futures import ThreadPoolExecutor
teller_barrier = threading.Barrier(3)
def now():
return time.strftime("%H:%M:%S")
def prepare_for_work(name):
print(f"{now()}: {name} is preparing their counter.")
time.sleep(random.randint(1, 3))
print(f"{now()}: {name} has finished preparing.")
teller_barrier.wait()
print(f"{now()}: {name} is now ready to serve customers.")
tellers = ["Teller 1", "Teller 2", "Teller 3"]
with ThreadPoolExecutor(max_workers=3) as executor:
for teller_name in tellers:
executor.submit(prepare_for_work, teller_name)
print(f"{now()}: All tellers are ready to serve customers.")
11:55:31: Teller 1 is preparing their counter.
11:55:31: Teller 2 is preparing their counter.
11:55:31: Teller 3 is preparing their counter.
11:55:32: Teller 2 has finished preparing.
11:55:33: Teller 1 has finished preparing.
11:55:34: Teller 3 has finished preparing.
11:55:34: Teller 3 is now ready to serve customers.
11:55:34: Teller 2 is now ready to serve customers.
11:55:34: Teller 1 is now ready to serve customers.
11:55:34: All tellers are ready to serve customers.
执行逻辑分为两部分
- 线程前面的逻辑 {name} is preparing their counter.
- teller_barrier.wait()此时线程阻塞,等待其他线程都执行到这句话
- 线程后续逻辑:当所有的线程都执行了teller_barrier.wait()后,表示等待结算
- 执行 name} is now ready to serve customers
有点像赛马的时候要所有的马都变位后栏栅者能打开,马儿才能跑出去。