Python并发编程中的设计模式(python 并发编程)

liftword18小时前技术文章6

在现代软件开发中,并发编程已经成为提升应用性能和用户体验的关键技术。随着多核处理器的普及和分布式系统的广泛应用,掌握并发编程的设计模式变得越来越重要。本文将深入探讨Python中常用的并发编程设计模式,包括生产者-消费者模式、线程池模式、工作者模式和读写锁模式等。通过这些模式的学习,开发者可以更好地解决并发编程中的常见问题,构建高效且可维护的并发系统。

一、生产者-消费者模式

1. 基础实现

生产者-消费者模式是并发编程中最基础且最常用的设计模式之一。这种模式通过将任务的生产和消费解耦,使系统能够独立地调整生产和消费的速率,从而提高系统的灵活性和可扩展性。在Python中,可以使用Queue来实现线程安全的数据传输,同时使用线程或协程来处理生产和消费逻辑。

下面的代码展示了一个典型的生产者-消费者模式实现:

import queue
import threading
import time
from typing import List, Any
import random

class ProductionSystem:
    def __init__(self, queue_size: int = 10):
        """初始化生产系统"""
        self.queue = queue.Queue(maxsize=queue_size)
        self.should_stop = threading.Event()
        self.producers: List[threading.Thread] = []
        self.consumers: List[threading.Thread] = []

    def add_producer(self, producer_fn, name: str = None):
        """添加生产者线程"""
        producer = threading.Thread(
            target=self._producer_wrapper,
            args=(producer_fn,),
            name=name or f"Producer-{len(self.producers)}"
        )
        self.producers.append(producer)
        producer.start()

    def add_consumer(self, consumer_fn, name: str = None):
        """添加消费者线程"""
        consumer = threading.Thread(
            target=self._consumer_wrapper,
            args=(consumer_fn,),
            name=name or f"Consumer-{len(self.consumers)}"
        )
        self.consumers.append(consumer)
        consumer.start()

    def _producer_wrapper(self, producer_fn):
        """生产者包装函数,处理异常和停止信号"""
        while not self.should_stop.is_set():
            try:
                item = producer_fn()
                self.queue.put(item, timeout=1)
            except queue.Full:
                continue
            except Exception as e:
                print(f"生产者发生错误: {e}")
                break

    def _consumer_wrapper(self, consumer_fn):
        """消费者包装函数,处理异常和停止信号"""
        while not self.should_stop.is_set():
            try:
                item = self.queue.get(timeout=1)
                consumer_fn(item)
                self.queue.task_done()
            except queue.Empty:
                continue
            except Exception as e:
                print(f"消费者发生错误: {e}")
                break

    def stop(self):
        """停止所有生产者和消费者"""
        self.should_stop.set()
        for thread in self.producers + self.consumers:
            thread.join()

def producer_fn():
    time.sleep(random.random())
    return random.randint(1, 100)

def consumer_fn(item):
    time.sleep(random.random())
    print(f"处理数据: {item}")

system = ProductionSystem(queue_size=5)
system.add_producer(producer_fn)
system.add_consumer(consumer_fn)

time.sleep(10)  # 运行10秒
system.stop()

输出结果:

处理数据: 11
处理数据: 54
处理数据: 28
处理数据: 54
处理数据: 64
处理数据: 49
处理数据: 2
处理数据: 30
处理数据: 73
处理数据: 91
处理数据: 86
处理数据: 48
处理数据: 19
处理数据: 91
处理数据: 91
处理数据: 80
处理数据: 24
处理数据: 76
处理数据: 98
处理数据: 71

2. 异步实现

在现代Python应用中,使用异步编程模型可以更好地处理IO密集型任务。

以下是使用asyncio实现的生产者-消费者模式,它能够更高效地处理大量并发操作:

import asyncio
import random
from asyncio import Queue
from typing import Callable, Coroutine, Any


class AsyncProductionSystem:
    def __init__(self, queue_size: int = 10):
        """初始化异步生产系统"""
        self.queue = Queue(maxsize=queue_size)
        self.producers = []
        self.consumers = []
        self.running = False

    async def start(self, num_producers: int, num_consumers: int,
                    producer_fn: Callable[[], Coroutine[Any, Any, Any]],
                    consumer_fn: Callable[[Any], Coroutine[Any, Any, None]]):
        """启动生产消费系统"""
        self.running = True

        # 创建生产者和消费者任务
        producer_tasks = [
            asyncio.create_task(self._producer_loop(producer_fn))
            for _ in range(num_producers)
        ]
        consumer_tasks = [
            asyncio.create_task(self._consumer_loop(consumer_fn))
            for _ in range(num_consumers)
        ]

        # 等待所有任务完成
        await asyncio.gather(*producer_tasks, *consumer_tasks)

    async def _producer_loop(self, producer_fn):
        """生产者循环"""
        while self.running:
            try:
                item = await producer_fn()
                await self.queue.put(item)
                print(f"生产者生产了: {item}")
            except Exception as e:
                print(f"生产者异常: {e}")
                break

    async def _consumer_loop(self, consumer_fn):
        """消费者循环"""
        while self.running or not self.queue.empty():
            try:
                item = await self.queue.get()
                await consumer_fn(item)
                self.queue.task_done()
                print(f"消费者处理了: {item}")
            except Exception as e:
                print(f"消费者异常: {e}")
                break

    def stop(self):
        """停止系统"""
        self.running = False
        print("停止信号已发出,等待所有任务完成...")

    async def wait_until_done(self):
        """等待队列中的所有任务完成"""
        await self.queue.join()
        print("所有任务已完成")


async def producer_fn():
    """生产者函数,模拟生成数据"""
    await asyncio.sleep(random.random())
    return random.randint(1, 100)


async def consumer_fn(item):
    """消费者函数,模拟处理数据"""
    await asyncio.sleep(random.random())
    print(f"处理数据: {item}")


async def main():
    """主函数,启动生产消费系统"""
    system = AsyncProductionSystem(queue_size=5)
    await system.start(num_producers=3, num_consumers=2, producer_fn=producer_fn, consumer_fn=consumer_fn)
    await asyncio.sleep(10)  # 运行10秒
    system.stop()
    await system.wait_until_done()


# 运行程序
asyncio.run(main())import asyncio
from asyncio import Queue
from typing import Callable, Coroutine, Any

class AsyncProductionSystem:
    def __init__(self, queue_size: int = 10):
        """初始化异步生产系统"""
        self.queue = Queue(maxsize=queue_size)
        self.producers = []
        self.consumers = []
        self.running = False

    async def start(self, num_producers: int, num_consumers: int,
                   producer_fn: Callable[[], Coroutine[Any, Any, Any]],
                   consumer_fn: Callable[[Any], Coroutine[Any, Any, None]]):
        """启动生产消费系统"""
        self.running = True
        
        # 创建生产者和消费者任务
        producer_tasks = [
            asyncio.create_task(self._producer_loop(producer_fn))
            for _ in range(num_producers)
        ]
        consumer_tasks = [
            asyncio.create_task(self._consumer_loop(consumer_fn))
            for _ in range(num_consumers)
        ]

        # 等待所有任务完成
        await asyncio.gather(*producer_tasks, *consumer_tasks)

    async def _producer_loop(self, producer_fn):
        """生产者循环"""
        while self.running:
            try:
                item = await producer_fn()
                await self.queue.put(item)
            except Exception as e:
                print(f"生产者异常: {e}")
                break

    async def _consumer_loop(self, consumer_fn):
        """消费者循环"""
        while self.running:
            try:
                item = await self.queue.get()
                await consumer_fn(item)
                self.queue.task_done()
            except Exception as e:
                print(f"消费者异常: {e}")
                break

    def stop(self):
        """停止系统"""
        self.running = False

二、线程池模式

1. 基于concurrent.futures的实现

线程池模式通过预先创建一组工作线程来处理并发任务,避免了频繁创建和销毁线程的开销。Python的concurrent.futures模块提供了强大的线程池实现,但在实际应用中,通常需要对其进行扩展以满足特定需求。

以下代码展示了一个增强型线程池的实现:

import concurrent
import threading
from concurrent.futures import ThreadPoolExecutor, Future
from typing import Callable, List, Any


class EnhancedThreadPool:
    def __init__(self, max_workers: int = None, thread_name_prefix: str = ""):
        """初始化增强型线程池"""
        self.executor = ThreadPoolExecutor(
            max_workers=max_workers,
            thread_name_prefix=thread_name_prefix
        )
        self.futures: List[Future] = []
        self.results = []
        self._lock = threading.Lock()
        self.task_count = 0

    def submit_task(self, fn: Callable, *args, **kwargs) -> Future:
        """提交任务到线程池"""
        future = self.executor.submit(self._task_wrapper, fn, *args, **kwargs)
        with self._lock:
            self.futures.append(future)
            self.task_count += 1
        return future

    def _task_wrapper(self, fn: Callable, *args, **kwargs):
        """任务包装器,用于收集结果和处理异常"""
        try:
            result = fn(*args, **kwargs)
            with self._lock:
                self.results.append(result)
            return result
        except Exception as e:
            print(f"任务执行异常: {e}")
            raise

    def wait_for_completion(self, timeout: float = None) -> bool:
        """等待所有任务完成"""
        try:
            done, not_done = concurrent.futures.wait(
                self.futures,
                timeout=timeout,
                return_when=concurrent.futures.ALL_COMPLETED
            )
            return len(not_done) == 0
        finally:
            with self._lock:
                self.futures = list(not_done)

    def get_results(self) -> List[Any]:
        """获取已完成任务的结果"""
        with self._lock:
            return self.results.copy()

    def shutdown(self, wait: bool = True):
        """关闭线程池"""
        self.executor.shutdown(wait=wait)

2. 动态线程池

在某些场景下,需要根据系统负载动态调整线程池的大小。

以下实现展示了一个可以根据任务队列长度和系统资源使用情况自动调整线程数量的动态线程池:

import threading
import time
from queue import Queue
from typing import Callable

import psutil


class DynamicThreadPool:
    def __init__(self, min_workers: int = 2, max_workers: int = 10,
                 queue_size: int = 100):
        """初始化动态线程池"""
        self.min_workers = min_workers
        self.max_workers = max_workers
        self.task_queue = Queue(maxsize=queue_size)
        self.workers = []
        self.active = True
        self.lock = threading.Lock()

        # 启动监控线程
        self.monitor = threading.Thread(target=self._monitor_resources)
        self.monitor.start()

        # 启动初始工作线程
        self._adjust_worker_count(min_workers)

    def _monitor_resources(self):
        """监控系统资源和任务队列,动态调整线程数"""
        while self.active:
            cpu_percent = psutil.cpu_percent()
            queue_size = self.task_queue.qsize()
            current_workers = len(self.workers)

            if queue_size > current_workers * 2 and cpu_percent < 80:
                # 队列积压且CPU资源充足,增加线程
                self._adjust_worker_count(min(current_workers + 2, self.max_workers))
            elif queue_size < current_workers and current_workers > self.min_workers:
                # 任务较少,减少线程
                self._adjust_worker_count(max(current_workers - 1, self.min_workers))

            time.sleep(5)  # 监控间隔

    def _adjust_worker_count(self, target_count: int):
        """调整工作线程数量"""
        with self.lock:
            current_count = len(self.workers)
            if target_count > current_count:
                # 增加工作线程
                for _ in range(target_count - current_count):
                    worker = threading.Thread(target=self._worker_loop)
                    worker.start()
                    self.workers.append(worker)
            elif target_count < current_count:
                # 标记多余的工作线程退出
                for _ in range(current_count - target_count):
                    self.task_queue.put(None)

    def _worker_loop(self):
        """工作线程主循环"""
        while self.active:
            try:
                task = self.task_queue.get(timeout=1)
                if task is None:  # 退出信号
                    with self.lock:
                        self.workers.remove(threading.current_thread())
                    break

                func, args, kwargs = task
                try:
                    func(*args, **kwargs)
                except Exception as e:
                    print(f"任务执行异常: {e}")
                finally:
                    self.task_queue.task_done()
            except Queue.Empty:
                continue

    def submit(self, func: Callable, *args, **kwargs):
        """提交任务到线程池"""
        if not self.active:
            raise RuntimeError("线程池已关闭")
        self.task_queue.put((func, args, kwargs))

    def shutdown(self, wait: bool = True):
        """关闭线程池"""
        self.active = False
        if wait:
            for worker in self.workers:
                worker.join()
        self.monitor.join()

三、工作者模式

1. 多阶段工作者模式

在复杂的并发系统中,任务处理常常需要经过多个阶段。多阶段工作者模式通过将任务处理分解为多个独立的阶段,每个阶段由专门的工作者处理,从而提高系统的处理效率和可维护性:

import queue
import threading
from dataclasses import dataclass
from typing import Any, Optional, List, Callable


@dataclass
class Task:
    """任务数据类"""
    id: str
    data: Any
    stage: str = 'initial'
    result: Optional[Any] = None
    error: Optional[Exception] = None


class MultiStageWorker:
    def __init__(self, stages: List[str]):
        """初始化多阶段工作者系统"""
        self.stages = stages
        self.queues = {
            stage: queue.Queue() for stage in stages
        }
        self.workers = {}
        self.active = True
        self.results = queue.Queue()

    def add_worker(self, stage: str, worker_fn: Callable[[Task], Task],
                   num_workers: int = 1):
        """为指定阶段添加工作者"""
        if stage not in self.stages:
            raise ValueError(f"未知的处理阶段: {stage}")

        self.workers[stage] = []
        for _ in range(num_workers):
            worker = threading.Thread(
                target=self._worker_loop,
                args=(stage, worker_fn)
            )
            worker.start()
            self.workers[stage].append(worker)

    def _worker_loop(self, stage: str, worker_fn: Callable[[Task], Task]):
        """工作者循环"""
        while self.active:
            try:
                task = self.queues[stage].get(timeout=1)
                try:
                    processed_task = worker_fn(task)
                    next_stage_idx = self.stages.index(stage) + 1

                    if next_stage_idx < len(self.stages):
                        # 将任务传递给下一阶段
                        next_stage = self.stages[next_stage_idx]
                        processed_task.stage = next_stage
                        self.queues[next_stage].put(processed_task)
                    else:
                        # 最后一个阶段,存储结果
                        self.results.put(processed_task)

                except Exception as e:
                    task.error = e
                    self.results.put(task)

                self.queues[stage].task_done()

            except queue.Empty:
                continue

    def submit_task(self, task: Task):
        """提交任务到第一个处理阶段"""
        if not self.active:
            raise RuntimeError("系统已关闭")
        self.queues[self.stages[0]].put(task)

    def get_result(self, timeout: Optional[float] = None) -> Task:
        """获取处理完成的任务结果"""
        return self.results.get(timeout=timeout)

    def shutdown(self):
        """关闭系统"""
        self.active = False
        for workers in self.workers.values():
            for worker in workers:
                worker.join()

2. 事件驱动工作者模式

事件驱动工作者模式结合了事件驱动编程和工作者模式的优点,特别适合处理异步任务流。在这种模式中,工作者通过订阅事件来接收任务,完成后通过发布事件来传递结果。这种方式能够实现更松散的组件耦合,提高系统的可扩展性。

以下代码展示了一个完整的事件驱动工作者系统的实现:

import asyncio
import uuid
from dataclasses import dataclass
from typing import Dict, Set, Callable, Any


@dataclass
class WorkEvent:
    """工作事件数据类"""
    event_type: str
    payload: Any
    source_id: str = None
    target_id: str = None
    correlation_id: str = str(uuid.uuid4())

class EventDrivenWorker:
    def __init__(self, worker_id: str):
        """初始化事件驱动工作者"""
        self.worker_id = worker_id
        self.event_handlers: Dict[str, Set[Callable]] = {}
        self.event_queue = asyncio.Queue()
        self.running = False

    async def start(self):
        """启动工作者"""
        self.running = True
        await self._event_loop()

    def subscribe(self, event_type: str, handler: Callable):
        """订阅特定类型的事件"""
        if event_type not in self.event_handlers:
            self.event_handlers[event_type] = set()
        self.event_handlers[event_type].add(handler)

    async def publish(self, event: WorkEvent):
        """发布事件到事件队列"""
        await self.event_queue.put(event)

    async def _event_loop(self):
        """事件处理主循环"""
        while self.running:
            try:
                event = await self.event_queue.get()
                if event.event_type in self.event_handlers:
                    handlers = self.event_handlers[event.event_type]
                    await asyncio.gather(
                        *[handler(event) for handler in handlers]
                    )
                self.event_queue.task_done()
            except Exception as e:
                print(f"事件处理异常: {e}")

class EventDrivenSystem:
    def __init__(self):
        """初始化事件驱动系统"""
        self.workers: Dict[str, EventDrivenWorker] = {}
        self.event_bus = asyncio.Queue()
        self.running = False

    async def add_worker(self, worker: EventDrivenWorker):
        """添加工作者到系统"""
        self.workers[worker.worker_id] = worker
        asyncio.create_task(worker.start())

    async def start(self):
        """启动事件系统"""
        self.running = True
        await self._event_dispatcher()

    async def _event_dispatcher(self):
        """事件分发器"""
        while self.running:
            try:
                event = await self.event_bus.get()
                if event.target_id:
                    if event.target_id in self.workers:
                        await self.workers[event.target_id].publish(event)
                else:
                    # 广播事件给所有工作者
                    await asyncio.gather(
                        *[worker.publish(event) for worker in self.workers.values()]
                    )
                self.event_bus.task_done()
            except Exception as e:
                print(f"事件分发异常: {e}")

    async def publish_event(self, event: WorkEvent):
        """发布事件到系统总线"""
        await self.event_bus.put(event)

    def shutdown(self):
        """关闭系统"""
        self.running = False
        for worker in self.workers.values():
            worker.running = False

四、读写锁模式

1. 基本读写锁实现

读写锁模式允许多个读操作同时进行,但写操作需要独占访问。这种模式在处理共享资源时特别有用,能够提高并发读取的效率。

以下代码展示了一个线程安全的读写锁实现:

import threading
from typing import Optional
import time

class ReadWriteLock:
    def __init__(self):
        """初始化读写锁"""
        self._read_lock = threading.Lock()
        self._write_lock = threading.Lock()
        self._read_count = 0
        self.read_waiting = 0
        self.write_waiting = 0

    def acquire_read(self, timeout: Optional[float] = None) -> bool:
        """获取读锁"""
        start_time = time.monotonic()
        self.read_waiting += 1
        try:
            # 获取读锁的控制权
            if not self._read_lock.acquire(timeout=timeout):
                return False

            try:
                # 第一个读者需要获取写锁
                if self._read_count == 0:
                    remaining_timeout = None
                    if timeout is not None:
                        elapsed = time.monotonic() - start_time
                        remaining_timeout = max(0, timeout - elapsed)
                    if not self._write_lock.acquire(timeout=remaining_timeout):
                        self._read_lock.release()
                        return False
                self._read_count += 1
                return True
            finally:
                self._read_lock.release()
        finally:
            self.read_waiting -= 1

    def release_read(self):
        """释放读锁"""
        with self._read_lock:
            self._read_count -= 1
            if self._read_count == 0:
                self._write_lock.release()

    def acquire_write(self, timeout: Optional[float] = None) -> bool:
        """获取写锁"""
        self.write_waiting += 1
        try:
            return self._write_lock.acquire(timeout=timeout)
        finally:
            self.write_waiting -= 1

    def release_write(self):
        """释放写锁"""
        self._write_lock.release()

class ReadWriteLockContext:
    """读写锁上下文管理器"""
    def __init__(self, lock: ReadWriteLock, write: bool = False):
        self.lock = lock
        self.write = write

    def __enter__(self):
        if self.write:
            self.lock.acquire_write()
        else:
            self.lock.acquire_read()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.write:
            self.lock.release_write()
        else:
            self.lock.release_read()

2. 可升级的读写锁

在某些场景下,需要将读锁升级为写锁。以下实现展示了一个支持锁升级的读写锁,它能够在保证数据一致性的同时提供更灵活的锁操作:

import threading


class UpgradeableReadWriteLock:
    def __init__(self):
        """初始化可升级的读写锁"""
        self._read_lock = threading.Lock()
        self._write_lock = threading.Lock()
        self._upgrade_lock = threading.Lock()
        self._read_count = 0
        self._upgrade_count = 0

    async def acquire_read(self):
        """获取读锁"""
        async with self._read_lock:
            self._read_count += 1
            if self._read_count == 1:
                await self._write_lock.acquire()

    async def release_read(self):
        """释放读锁"""
        async with self._read_lock:
            self._read_count -= 1
            if self._read_count == 0:
                self._write_lock.release()

    async def acquire_upgradeable_read(self):
        """获取可升级的读锁"""
        await self._upgrade_lock.acquire()
        await self.acquire_read()

    async def upgrade_to_write(self):
        """将读锁升级为写锁"""
        async with self._read_lock:
            self._read_count -= 1
            if self._read_count == 0:
                self._write_lock.release()
        await self._write_lock.acquire()
        self._upgrade_count += 1

    async def downgrade_to_read(self):
        """将写锁降级为读锁"""
        self._upgrade_count -= 1
        async with self._read_lock:
            self._read_count += 1
        self._write_lock.release()
        if self._upgrade_count == 0:
            self._upgrade_lock.release()

    def __enter__(self):
        """上下文管理器入口"""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """上下文管理器出口"""
        if self._upgrade_count > 0:
            self.downgrade_to_read()
        self.release_read()

总结

本文详细介绍了Python并发编程中的几种重要设计模式。通过生产者-消费者模式,可以有效地解耦任务的生产和消费过程;线程池模式高效地管理并复用线程资源;工作者模式则提供了一种结构化的方式来处理复杂的任务流程;而读写锁模式则为共享资源的并发访问提供了细粒度的控制。这些设计模式不仅提供了解决并发问题的通用方案,还能够显著提升程序的性能和可维护性。在实际应用中,开发者可以根据具体需求选择合适的模式或将多种模式组合使用,从而构建出高效、可靠的并发系统。

相关文章

一文了解 Python 中的线程池:高效并发编程的秘密武器

在 Python 编程的广阔天地里,我们常常会遇到需要同时处理多个任务的场景。想象一下,你正在开发一个网络爬虫,需要同时从多个网页上抓取数据;又或者你在处理大量的文件,需要同时对不同的文件进行读取、分...

24-3-Python多线程-线程队列-queue模块

3-1-概念queue模块提供了多线程编程中的队列实现,队列是线程安全的数据结构,能在多线程环境下安全地进行数据交换。3-2-queue 的队列类型Queue(先进先出队列)、LifoQueue(后进...

一分钟快速部署Django应用(django部署到linux)

在Python Web开发方面,Django的用户人数应该是最多的。很多开发者在完成应用开发之后,都会面临线上部署Django应用这个头疼的问题。当初我在部署“编程派”网站时,就碰到了很多障碍,折腾了...

使用Python进行并发编程(python 并发编程)

让计算机程序并发的运行是一个经常被讨论的话题,今天我想讨论一下Python下的各种并发方式。并发方式线程(Thread)多线程几乎是每一个程序猿在使用每一种语言时都会首先想到用于解决并发的工具(JS程...

从零构建Python响应式编程的核心原理与实现方法

响应式编程是一种以数据流和变化传播为核心的编程范式,它允许我们以声明式的方式处理异步数据流。在当今复杂的应用环境中,响应式编程正逐渐成为处理事件驱动型应用、实时数据处理以及交互式用户界面的重要方法。本...