python线程安全:条件Condition(python 线程安全的数据类型)

liftword22小时前技术文章1

条件Condition

可以把Condition理解为一把高级的琐,它提供了比Lock, RLock更高级的功能,允许我们能够控制复杂的线程同步问题。threadiong.Condition在内部维护一个琐对象(默认是RLock),可以在创建Condigtion对象的时候把琐对象作为参数传入。

Condition也提供了acquire, release方法,其含义与琐的acquire, release方法一致,其实它只是简单的调用内部琐对象的对应的方法而已。

Condition还提供了如下方法(特别要注意:这些方法

  • 首先 (acquire)之后才能调用,
  • 执行 wait/notify/notifyall
  • 最后 release()否则将会报RuntimeError异常。):


  • Condition.wait([timeout]):
  • wait方法释放内部所占用的琐,同时线程被挂起,直至接收到通知被唤醒或超时(如果提供了timeout参数的话)。当线程被唤醒并重新占有琐的时候,程序才会继续执行下去。
  • Condition.notify():
  • 唤醒一个挂起的线程(如果存在挂起的线程)。注意:notify()方法不会释放所占用的琐。
  • Condition.notify_all() Condition.notifyAll()
  • 唤醒所有挂起的线程(如果存在挂起的线程)。注意:这些方法不会释放所占用的琐。

contiont的使用:实现生产者消费者

为什么要用生产者-消费者模型?

  • 缓解生产者与消费者的速度差异:只能生产一个、再消费一个这样轮换的话,如果生产者和消费者速度差异很大,就会造成等待时间过长的问题。此时可以用一个缓冲区用来存储生产者生产的数据。
  • 解耦:生产者消费者之间没有直接联系,代码不会相互影响

典型的场景:排除办事,窗口的工作人员从队列头取一个元素处理,任务结束后。队列商务部的人就离开,接着取下一个元素,然后队尾有多个人去皋陶,如果没有 控制,队列会混乱。

'''
Created on 2024年11月9日
@author: admin
'''
import random
import threading
import time
from concurrent.futures import ThreadPoolExecutor

customer_available_condition = threading.Condition()

customer_queue = []

def now():
    return time.strftime("%H:%M:%S")

def serve_customers():
    while True:
        with customer_available_condition:
            # Wait for a customer to arrive
            while not customer_queue:
                print(f"{now()}: Teller is waiting for a customer.")
                customer_available_condition.wait()

            # Serve the customer
            customer = customer_queue.pop(0)
            print(f"{now()}: Teller is serving {customer}.")

        time.sleep(random.randint(1, 5))
        print(f"{now()}: Teller has finished serving {customer}.")

def add_customer_to_queue(name):
    with customer_available_condition:
        print(f"{now()}: {name} has arrived at the bank.")
        customer_queue.append(name)

        customer_available_condition.notify()

customer_names = [
    "Customer 1",
    "Customer 2",
    "Customer 3",
    "Customer 4",
    "Customer 5",
]

with ThreadPoolExecutor(max_workers=6) as executor:
    teller_thread = executor.submit(serve_customers)
    for name in customer_names:
      
        time.sleep(random.randint(1, 3))
        executor.submit(add_customer_to_queue, name)
        
        

这是一个生产都消费者程序

  • 用户数据队列 customer_queue
  • 有一个线程执行serve_customers从队列中取用户数据并处理
  • 多个线程向队列中添加数据

队列消费者serve_customers


def serve_customers():
    while True:
        with customer_available_condition:
            # with customer_available_condition:就实现了自动acquire/rrelease
            while not customer_queue:
                print(f"{now()}: Teller is waiting for a customer.")
                customer_available_condition.wait()

            # Serve the customer
            customer = customer_queue.pop(0)
            print(f"{now()}: Teller is serving {customer}.")

        # Simulate the time taken to serve the customer
        time.sleep(random.randint(1, 5))
        print(f"{now()}: Teller has finished serving {customer}.")
  • with customer_available_condition:就实现了自动acquire/rrelease
  • while not customer_queue:当队列是空的时候时候customer_available_condition.wait(线程阻塞
  • 如果队列有数据 customer = customer_queue.pop(0)取出孤独死第一个元素。

队列生产者add_customer_to_queue

def add_customer_to_queue(name):
    with customer_available_condition:
        print(f"{now()}: {name} has arrived at the bank.")
        customer_queue.append(name)

        customer_available_condition.notify()
  • with customer_available_condition:先获取锁
  • customer_queue.append(name)给队列中添加元素
  • customer_available_condition.notify()通知消费者线程可以处理元素了


此时前面我们知道 serve_customers函数中的 customer_available_condition.wait()阻塞的地方会继续执行 while not customer_queue:判断因此生产者已经向队列中加入的了数据,所以这个while not customer_queue:会跳出,执行后面的消费队列 的元素逻辑。

def serve_customers():
    while True:
        with customer_available_condition:
            while not customer_queue:
                print(f"{now()}: Teller is waiting for a customer.")
                customer_available_condition.wait()

这里解决一个Python知道点,如果列表a 为空那么not a的值是True

>>> a =[]
>>> not a
True
>>> 

Conditon的另一种场景 Barrier

有多个任务要执行,需要等它们都执行成功后接着继续执行后面的逻辑。


import random
import threading
import time
from concurrent.futures import ThreadPoolExecutor

teller_barrier = threading.Barrier(3)

def now():
    return time.strftime("%H:%M:%S")

def prepare_for_work(name):
    print(f"{now()}: {name} is preparing their counter.")

    time.sleep(random.randint(1, 3))
    print(f"{now()}: {name} has finished preparing.")


    teller_barrier.wait()
    print(f"{now()}: {name} is now ready to serve customers.")

tellers = ["Teller 1", "Teller 2", "Teller 3"]

with ThreadPoolExecutor(max_workers=3) as executor:
    for teller_name in tellers:
        executor.submit(prepare_for_work, teller_name)

print(f"{now()}: All tellers are ready to serve customers.")
11:55:31: Teller 1 is preparing their counter.
11:55:31: Teller 2 is preparing their counter.
11:55:31: Teller 3 is preparing their counter.
11:55:32: Teller 2 has finished preparing.
11:55:33: Teller 1 has finished preparing.
11:55:34: Teller 3 has finished preparing.

11:55:34: Teller 3 is now ready to serve customers.
11:55:34: Teller 2 is now ready to serve customers.
11:55:34: Teller 1 is now ready to serve customers.


11:55:34: All tellers are ready to serve customers.

执行逻辑分为两部分

  • 线程前面的逻辑 {name} is preparing their counter.
  • teller_barrier.wait()此时线程阻塞,等待其他线程都执行到这句话
  • 线程后续逻辑:当所有的线程都执行了teller_barrier.wait()后,表示等待结算
  • 执行 name} is now ready to serve customers

有点像赛马的时候要所有的马都变位后栏栅者能打开,马儿才能跑出去。



相关文章

python 判断变量是否是 None 的三种写法

代码中经常会有变量是否为None的判断,有三种主要的写法:第一种是 if x is None ;第二种是 if not x: ;第三种是 if not x is None (这句这样理解更清晰 if...

简单学Python——关键字2——True和False

True和False是Python中的两个关键字,是布尔类型,分别用于表示真和假。1、True和False表示真和假的例子:#将1==2的结果赋值给了x x=1==2 #将1==2的结果赋值给了y y...

Python基础:pass语句知识详解(python中pass)

欢迎你来到站长在线的站长学堂学习Python知识,本文分享的是《pass语句知识详解》。pass的中文翻译:通过;走过;沿某方向前进;向某方向移动;及格;合格;通行证。在Python中表示空的语句,包...

Python的条件判断(python三个条件判断)

计算机之所以能够执行众多自动化任务,关键在于它具备自行进行条件判断的能力。例如,当输入用户年龄后,依据不同的年龄来打印相应内容,在 Python 程序里,这可以通过 if 语句来实现,示例如下:age...

用python编写判断输入是否为整数的程序

最近在自学python,写了两个小程序,大家帮个看看有没有更好的方式来编写,没学过编程,学起来有点懵。想实现一个功能,就是判断外部输入是否为整数,就是判断是不是数字,要是输入的其他的返回“您输入的信息...

python编程实践:如何将变量正确设置为空?

在Python中,变量是非常重要的一部分。它们用于储存数据,来支持程序的运行。当我们在编程时,将来可能会遇到一个问题:如何将变量正确设置为空?什么是变量?在Python中,变量是程序员用来储存数据的一...