Python并发编程中的设计模式(python 并发编程)
在现代软件开发中,并发编程已经成为提升应用性能和用户体验的关键技术。随着多核处理器的普及和分布式系统的广泛应用,掌握并发编程的设计模式变得越来越重要。本文将深入探讨Python中常用的并发编程设计模式,包括生产者-消费者模式、线程池模式、工作者模式和读写锁模式等。通过这些模式的学习,开发者可以更好地解决并发编程中的常见问题,构建高效且可维护的并发系统。
一、生产者-消费者模式
1. 基础实现
生产者-消费者模式是并发编程中最基础且最常用的设计模式之一。这种模式通过将任务的生产和消费解耦,使系统能够独立地调整生产和消费的速率,从而提高系统的灵活性和可扩展性。在Python中,可以使用Queue来实现线程安全的数据传输,同时使用线程或协程来处理生产和消费逻辑。
下面的代码展示了一个典型的生产者-消费者模式实现:
import queue
import threading
import time
from typing import List, Any
import random
class ProductionSystem:
def __init__(self, queue_size: int = 10):
"""初始化生产系统"""
self.queue = queue.Queue(maxsize=queue_size)
self.should_stop = threading.Event()
self.producers: List[threading.Thread] = []
self.consumers: List[threading.Thread] = []
def add_producer(self, producer_fn, name: str = None):
"""添加生产者线程"""
producer = threading.Thread(
target=self._producer_wrapper,
args=(producer_fn,),
name=name or f"Producer-{len(self.producers)}"
)
self.producers.append(producer)
producer.start()
def add_consumer(self, consumer_fn, name: str = None):
"""添加消费者线程"""
consumer = threading.Thread(
target=self._consumer_wrapper,
args=(consumer_fn,),
name=name or f"Consumer-{len(self.consumers)}"
)
self.consumers.append(consumer)
consumer.start()
def _producer_wrapper(self, producer_fn):
"""生产者包装函数,处理异常和停止信号"""
while not self.should_stop.is_set():
try:
item = producer_fn()
self.queue.put(item, timeout=1)
except queue.Full:
continue
except Exception as e:
print(f"生产者发生错误: {e}")
break
def _consumer_wrapper(self, consumer_fn):
"""消费者包装函数,处理异常和停止信号"""
while not self.should_stop.is_set():
try:
item = self.queue.get(timeout=1)
consumer_fn(item)
self.queue.task_done()
except queue.Empty:
continue
except Exception as e:
print(f"消费者发生错误: {e}")
break
def stop(self):
"""停止所有生产者和消费者"""
self.should_stop.set()
for thread in self.producers + self.consumers:
thread.join()
def producer_fn():
time.sleep(random.random())
return random.randint(1, 100)
def consumer_fn(item):
time.sleep(random.random())
print(f"处理数据: {item}")
system = ProductionSystem(queue_size=5)
system.add_producer(producer_fn)
system.add_consumer(consumer_fn)
time.sleep(10) # 运行10秒
system.stop()
输出结果:
处理数据: 11
处理数据: 54
处理数据: 28
处理数据: 54
处理数据: 64
处理数据: 49
处理数据: 2
处理数据: 30
处理数据: 73
处理数据: 91
处理数据: 86
处理数据: 48
处理数据: 19
处理数据: 91
处理数据: 91
处理数据: 80
处理数据: 24
处理数据: 76
处理数据: 98
处理数据: 71
2. 异步实现
在现代Python应用中,使用异步编程模型可以更好地处理IO密集型任务。
以下是使用asyncio实现的生产者-消费者模式,它能够更高效地处理大量并发操作:
import asyncio
import random
from asyncio import Queue
from typing import Callable, Coroutine, Any
class AsyncProductionSystem:
def __init__(self, queue_size: int = 10):
"""初始化异步生产系统"""
self.queue = Queue(maxsize=queue_size)
self.producers = []
self.consumers = []
self.running = False
async def start(self, num_producers: int, num_consumers: int,
producer_fn: Callable[[], Coroutine[Any, Any, Any]],
consumer_fn: Callable[[Any], Coroutine[Any, Any, None]]):
"""启动生产消费系统"""
self.running = True
# 创建生产者和消费者任务
producer_tasks = [
asyncio.create_task(self._producer_loop(producer_fn))
for _ in range(num_producers)
]
consumer_tasks = [
asyncio.create_task(self._consumer_loop(consumer_fn))
for _ in range(num_consumers)
]
# 等待所有任务完成
await asyncio.gather(*producer_tasks, *consumer_tasks)
async def _producer_loop(self, producer_fn):
"""生产者循环"""
while self.running:
try:
item = await producer_fn()
await self.queue.put(item)
print(f"生产者生产了: {item}")
except Exception as e:
print(f"生产者异常: {e}")
break
async def _consumer_loop(self, consumer_fn):
"""消费者循环"""
while self.running or not self.queue.empty():
try:
item = await self.queue.get()
await consumer_fn(item)
self.queue.task_done()
print(f"消费者处理了: {item}")
except Exception as e:
print(f"消费者异常: {e}")
break
def stop(self):
"""停止系统"""
self.running = False
print("停止信号已发出,等待所有任务完成...")
async def wait_until_done(self):
"""等待队列中的所有任务完成"""
await self.queue.join()
print("所有任务已完成")
async def producer_fn():
"""生产者函数,模拟生成数据"""
await asyncio.sleep(random.random())
return random.randint(1, 100)
async def consumer_fn(item):
"""消费者函数,模拟处理数据"""
await asyncio.sleep(random.random())
print(f"处理数据: {item}")
async def main():
"""主函数,启动生产消费系统"""
system = AsyncProductionSystem(queue_size=5)
await system.start(num_producers=3, num_consumers=2, producer_fn=producer_fn, consumer_fn=consumer_fn)
await asyncio.sleep(10) # 运行10秒
system.stop()
await system.wait_until_done()
# 运行程序
asyncio.run(main())import asyncio
from asyncio import Queue
from typing import Callable, Coroutine, Any
class AsyncProductionSystem:
def __init__(self, queue_size: int = 10):
"""初始化异步生产系统"""
self.queue = Queue(maxsize=queue_size)
self.producers = []
self.consumers = []
self.running = False
async def start(self, num_producers: int, num_consumers: int,
producer_fn: Callable[[], Coroutine[Any, Any, Any]],
consumer_fn: Callable[[Any], Coroutine[Any, Any, None]]):
"""启动生产消费系统"""
self.running = True
# 创建生产者和消费者任务
producer_tasks = [
asyncio.create_task(self._producer_loop(producer_fn))
for _ in range(num_producers)
]
consumer_tasks = [
asyncio.create_task(self._consumer_loop(consumer_fn))
for _ in range(num_consumers)
]
# 等待所有任务完成
await asyncio.gather(*producer_tasks, *consumer_tasks)
async def _producer_loop(self, producer_fn):
"""生产者循环"""
while self.running:
try:
item = await producer_fn()
await self.queue.put(item)
except Exception as e:
print(f"生产者异常: {e}")
break
async def _consumer_loop(self, consumer_fn):
"""消费者循环"""
while self.running:
try:
item = await self.queue.get()
await consumer_fn(item)
self.queue.task_done()
except Exception as e:
print(f"消费者异常: {e}")
break
def stop(self):
"""停止系统"""
self.running = False
二、线程池模式
1. 基于concurrent.futures的实现
线程池模式通过预先创建一组工作线程来处理并发任务,避免了频繁创建和销毁线程的开销。Python的concurrent.futures模块提供了强大的线程池实现,但在实际应用中,通常需要对其进行扩展以满足特定需求。
以下代码展示了一个增强型线程池的实现:
import concurrent
import threading
from concurrent.futures import ThreadPoolExecutor, Future
from typing import Callable, List, Any
class EnhancedThreadPool:
def __init__(self, max_workers: int = None, thread_name_prefix: str = ""):
"""初始化增强型线程池"""
self.executor = ThreadPoolExecutor(
max_workers=max_workers,
thread_name_prefix=thread_name_prefix
)
self.futures: List[Future] = []
self.results = []
self._lock = threading.Lock()
self.task_count = 0
def submit_task(self, fn: Callable, *args, **kwargs) -> Future:
"""提交任务到线程池"""
future = self.executor.submit(self._task_wrapper, fn, *args, **kwargs)
with self._lock:
self.futures.append(future)
self.task_count += 1
return future
def _task_wrapper(self, fn: Callable, *args, **kwargs):
"""任务包装器,用于收集结果和处理异常"""
try:
result = fn(*args, **kwargs)
with self._lock:
self.results.append(result)
return result
except Exception as e:
print(f"任务执行异常: {e}")
raise
def wait_for_completion(self, timeout: float = None) -> bool:
"""等待所有任务完成"""
try:
done, not_done = concurrent.futures.wait(
self.futures,
timeout=timeout,
return_when=concurrent.futures.ALL_COMPLETED
)
return len(not_done) == 0
finally:
with self._lock:
self.futures = list(not_done)
def get_results(self) -> List[Any]:
"""获取已完成任务的结果"""
with self._lock:
return self.results.copy()
def shutdown(self, wait: bool = True):
"""关闭线程池"""
self.executor.shutdown(wait=wait)
2. 动态线程池
在某些场景下,需要根据系统负载动态调整线程池的大小。
以下实现展示了一个可以根据任务队列长度和系统资源使用情况自动调整线程数量的动态线程池:
import threading
import time
from queue import Queue
from typing import Callable
import psutil
class DynamicThreadPool:
def __init__(self, min_workers: int = 2, max_workers: int = 10,
queue_size: int = 100):
"""初始化动态线程池"""
self.min_workers = min_workers
self.max_workers = max_workers
self.task_queue = Queue(maxsize=queue_size)
self.workers = []
self.active = True
self.lock = threading.Lock()
# 启动监控线程
self.monitor = threading.Thread(target=self._monitor_resources)
self.monitor.start()
# 启动初始工作线程
self._adjust_worker_count(min_workers)
def _monitor_resources(self):
"""监控系统资源和任务队列,动态调整线程数"""
while self.active:
cpu_percent = psutil.cpu_percent()
queue_size = self.task_queue.qsize()
current_workers = len(self.workers)
if queue_size > current_workers * 2 and cpu_percent < 80:
# 队列积压且CPU资源充足,增加线程
self._adjust_worker_count(min(current_workers + 2, self.max_workers))
elif queue_size < current_workers and current_workers > self.min_workers:
# 任务较少,减少线程
self._adjust_worker_count(max(current_workers - 1, self.min_workers))
time.sleep(5) # 监控间隔
def _adjust_worker_count(self, target_count: int):
"""调整工作线程数量"""
with self.lock:
current_count = len(self.workers)
if target_count > current_count:
# 增加工作线程
for _ in range(target_count - current_count):
worker = threading.Thread(target=self._worker_loop)
worker.start()
self.workers.append(worker)
elif target_count < current_count:
# 标记多余的工作线程退出
for _ in range(current_count - target_count):
self.task_queue.put(None)
def _worker_loop(self):
"""工作线程主循环"""
while self.active:
try:
task = self.task_queue.get(timeout=1)
if task is None: # 退出信号
with self.lock:
self.workers.remove(threading.current_thread())
break
func, args, kwargs = task
try:
func(*args, **kwargs)
except Exception as e:
print(f"任务执行异常: {e}")
finally:
self.task_queue.task_done()
except Queue.Empty:
continue
def submit(self, func: Callable, *args, **kwargs):
"""提交任务到线程池"""
if not self.active:
raise RuntimeError("线程池已关闭")
self.task_queue.put((func, args, kwargs))
def shutdown(self, wait: bool = True):
"""关闭线程池"""
self.active = False
if wait:
for worker in self.workers:
worker.join()
self.monitor.join()
三、工作者模式
1. 多阶段工作者模式
在复杂的并发系统中,任务处理常常需要经过多个阶段。多阶段工作者模式通过将任务处理分解为多个独立的阶段,每个阶段由专门的工作者处理,从而提高系统的处理效率和可维护性:
import queue
import threading
from dataclasses import dataclass
from typing import Any, Optional, List, Callable
@dataclass
class Task:
"""任务数据类"""
id: str
data: Any
stage: str = 'initial'
result: Optional[Any] = None
error: Optional[Exception] = None
class MultiStageWorker:
def __init__(self, stages: List[str]):
"""初始化多阶段工作者系统"""
self.stages = stages
self.queues = {
stage: queue.Queue() for stage in stages
}
self.workers = {}
self.active = True
self.results = queue.Queue()
def add_worker(self, stage: str, worker_fn: Callable[[Task], Task],
num_workers: int = 1):
"""为指定阶段添加工作者"""
if stage not in self.stages:
raise ValueError(f"未知的处理阶段: {stage}")
self.workers[stage] = []
for _ in range(num_workers):
worker = threading.Thread(
target=self._worker_loop,
args=(stage, worker_fn)
)
worker.start()
self.workers[stage].append(worker)
def _worker_loop(self, stage: str, worker_fn: Callable[[Task], Task]):
"""工作者循环"""
while self.active:
try:
task = self.queues[stage].get(timeout=1)
try:
processed_task = worker_fn(task)
next_stage_idx = self.stages.index(stage) + 1
if next_stage_idx < len(self.stages):
# 将任务传递给下一阶段
next_stage = self.stages[next_stage_idx]
processed_task.stage = next_stage
self.queues[next_stage].put(processed_task)
else:
# 最后一个阶段,存储结果
self.results.put(processed_task)
except Exception as e:
task.error = e
self.results.put(task)
self.queues[stage].task_done()
except queue.Empty:
continue
def submit_task(self, task: Task):
"""提交任务到第一个处理阶段"""
if not self.active:
raise RuntimeError("系统已关闭")
self.queues[self.stages[0]].put(task)
def get_result(self, timeout: Optional[float] = None) -> Task:
"""获取处理完成的任务结果"""
return self.results.get(timeout=timeout)
def shutdown(self):
"""关闭系统"""
self.active = False
for workers in self.workers.values():
for worker in workers:
worker.join()
2. 事件驱动工作者模式
事件驱动工作者模式结合了事件驱动编程和工作者模式的优点,特别适合处理异步任务流。在这种模式中,工作者通过订阅事件来接收任务,完成后通过发布事件来传递结果。这种方式能够实现更松散的组件耦合,提高系统的可扩展性。
以下代码展示了一个完整的事件驱动工作者系统的实现:
import asyncio
import uuid
from dataclasses import dataclass
from typing import Dict, Set, Callable, Any
@dataclass
class WorkEvent:
"""工作事件数据类"""
event_type: str
payload: Any
source_id: str = None
target_id: str = None
correlation_id: str = str(uuid.uuid4())
class EventDrivenWorker:
def __init__(self, worker_id: str):
"""初始化事件驱动工作者"""
self.worker_id = worker_id
self.event_handlers: Dict[str, Set[Callable]] = {}
self.event_queue = asyncio.Queue()
self.running = False
async def start(self):
"""启动工作者"""
self.running = True
await self._event_loop()
def subscribe(self, event_type: str, handler: Callable):
"""订阅特定类型的事件"""
if event_type not in self.event_handlers:
self.event_handlers[event_type] = set()
self.event_handlers[event_type].add(handler)
async def publish(self, event: WorkEvent):
"""发布事件到事件队列"""
await self.event_queue.put(event)
async def _event_loop(self):
"""事件处理主循环"""
while self.running:
try:
event = await self.event_queue.get()
if event.event_type in self.event_handlers:
handlers = self.event_handlers[event.event_type]
await asyncio.gather(
*[handler(event) for handler in handlers]
)
self.event_queue.task_done()
except Exception as e:
print(f"事件处理异常: {e}")
class EventDrivenSystem:
def __init__(self):
"""初始化事件驱动系统"""
self.workers: Dict[str, EventDrivenWorker] = {}
self.event_bus = asyncio.Queue()
self.running = False
async def add_worker(self, worker: EventDrivenWorker):
"""添加工作者到系统"""
self.workers[worker.worker_id] = worker
asyncio.create_task(worker.start())
async def start(self):
"""启动事件系统"""
self.running = True
await self._event_dispatcher()
async def _event_dispatcher(self):
"""事件分发器"""
while self.running:
try:
event = await self.event_bus.get()
if event.target_id:
if event.target_id in self.workers:
await self.workers[event.target_id].publish(event)
else:
# 广播事件给所有工作者
await asyncio.gather(
*[worker.publish(event) for worker in self.workers.values()]
)
self.event_bus.task_done()
except Exception as e:
print(f"事件分发异常: {e}")
async def publish_event(self, event: WorkEvent):
"""发布事件到系统总线"""
await self.event_bus.put(event)
def shutdown(self):
"""关闭系统"""
self.running = False
for worker in self.workers.values():
worker.running = False
四、读写锁模式
1. 基本读写锁实现
读写锁模式允许多个读操作同时进行,但写操作需要独占访问。这种模式在处理共享资源时特别有用,能够提高并发读取的效率。
以下代码展示了一个线程安全的读写锁实现:
import threading
from typing import Optional
import time
class ReadWriteLock:
def __init__(self):
"""初始化读写锁"""
self._read_lock = threading.Lock()
self._write_lock = threading.Lock()
self._read_count = 0
self.read_waiting = 0
self.write_waiting = 0
def acquire_read(self, timeout: Optional[float] = None) -> bool:
"""获取读锁"""
start_time = time.monotonic()
self.read_waiting += 1
try:
# 获取读锁的控制权
if not self._read_lock.acquire(timeout=timeout):
return False
try:
# 第一个读者需要获取写锁
if self._read_count == 0:
remaining_timeout = None
if timeout is not None:
elapsed = time.monotonic() - start_time
remaining_timeout = max(0, timeout - elapsed)
if not self._write_lock.acquire(timeout=remaining_timeout):
self._read_lock.release()
return False
self._read_count += 1
return True
finally:
self._read_lock.release()
finally:
self.read_waiting -= 1
def release_read(self):
"""释放读锁"""
with self._read_lock:
self._read_count -= 1
if self._read_count == 0:
self._write_lock.release()
def acquire_write(self, timeout: Optional[float] = None) -> bool:
"""获取写锁"""
self.write_waiting += 1
try:
return self._write_lock.acquire(timeout=timeout)
finally:
self.write_waiting -= 1
def release_write(self):
"""释放写锁"""
self._write_lock.release()
class ReadWriteLockContext:
"""读写锁上下文管理器"""
def __init__(self, lock: ReadWriteLock, write: bool = False):
self.lock = lock
self.write = write
def __enter__(self):
if self.write:
self.lock.acquire_write()
else:
self.lock.acquire_read()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.write:
self.lock.release_write()
else:
self.lock.release_read()
2. 可升级的读写锁
在某些场景下,需要将读锁升级为写锁。以下实现展示了一个支持锁升级的读写锁,它能够在保证数据一致性的同时提供更灵活的锁操作:
import threading
class UpgradeableReadWriteLock:
def __init__(self):
"""初始化可升级的读写锁"""
self._read_lock = threading.Lock()
self._write_lock = threading.Lock()
self._upgrade_lock = threading.Lock()
self._read_count = 0
self._upgrade_count = 0
async def acquire_read(self):
"""获取读锁"""
async with self._read_lock:
self._read_count += 1
if self._read_count == 1:
await self._write_lock.acquire()
async def release_read(self):
"""释放读锁"""
async with self._read_lock:
self._read_count -= 1
if self._read_count == 0:
self._write_lock.release()
async def acquire_upgradeable_read(self):
"""获取可升级的读锁"""
await self._upgrade_lock.acquire()
await self.acquire_read()
async def upgrade_to_write(self):
"""将读锁升级为写锁"""
async with self._read_lock:
self._read_count -= 1
if self._read_count == 0:
self._write_lock.release()
await self._write_lock.acquire()
self._upgrade_count += 1
async def downgrade_to_read(self):
"""将写锁降级为读锁"""
self._upgrade_count -= 1
async with self._read_lock:
self._read_count += 1
self._write_lock.release()
if self._upgrade_count == 0:
self._upgrade_lock.release()
def __enter__(self):
"""上下文管理器入口"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""上下文管理器出口"""
if self._upgrade_count > 0:
self.downgrade_to_read()
self.release_read()
总结
本文详细介绍了Python并发编程中的几种重要设计模式。通过生产者-消费者模式,可以有效地解耦任务的生产和消费过程;线程池模式高效地管理并复用线程资源;工作者模式则提供了一种结构化的方式来处理复杂的任务流程;而读写锁模式则为共享资源的并发访问提供了细粒度的控制。这些设计模式不仅提供了解决并发问题的通用方案,还能够显著提升程序的性能和可维护性。在实际应用中,开发者可以根据具体需求选择合适的模式或将多种模式组合使用,从而构建出高效、可靠的并发系统。