30天学会Python编程:16. Python常用标准库使用教程
16.1 collections模块
16.1.1 高级数据结构
16.1.2 示例
from collections import defaultdict, Counter, deque
# 默认字典
word_counts = defaultdict(int)
for word in ['apple', 'banana', 'apple']:
word_counts[word] += 1
# 计数器
inventory = Counter(apple=10, banana=5)
inventory.update(['apple', 'orange'])
# 双端队列
d = deque(maxlen=3)
d.extend([1,2,3])
d.appendleft(0) # 自动移除最右端元素
16.2 itertools模块
16.2.1 迭代器工具分类
表16-1 itertools主要功能
类别 | 函数 | 描述 |
无限迭代器 | count() | 数字序列 |
cycle() | 循环迭代 | |
repeat() | 重复元素 | |
组合迭代器 | product() | 笛卡尔积 |
permutations() | 排列 | |
combinations() | 组合 | |
实用工具 | chain() | 连接迭代器 |
groupby() | 分组操作 | |
zip_longest() | 长压缩 |
16.2.2 示例
import itertools
# 分组操作
data = sorted([('A', 1), ('B', 2), ('A', 3)], key=lambda x: x[0])
for key, group in itertools.groupby(data, key=lambda x: x[0]):
print(f"{key}: {list(group)}")
# 排列组合
print(list(itertools.permutations('ABC', 2)))
16.3 functools模块
16.3.1 核心功能解析
from functools import partial, lru_cache, reduce
# 偏函数
base_two = partial(int, base=2)
print(base_two('1010')) # 10
# 缓存装饰器
@lru_cache(maxsize=128)
def fibonacci(n):
return n if n < 2 else fibonacci(n-1) + fibonacci(n-2)
# 累积计算
product = reduce(lambda x,y: x*y, range(1,6)) # 120
16.3.2 singledispatch
from functools import singledispatch
@singledispatch
def process(data):
raise NotImplementedError("未实现类型")
@process.register(str)
def _(text):
return f"处理字符串: {text.lower()}"
@process.register(int)
def _(number):
return f"处理数字: {number**2}"
print(process("Hello")) # 处理字符串: hello
print(process(5)) # 处理数字: 25
16.4 contextlib模块
16.4.1 上下文管理器工具
from contextlib import contextmanager, suppress, redirect_stdout
import io
# 自定义上下文管理器
@contextmanager
def timer():
import time
start = time.time()
try:
yield
finally:
print(f"耗时: {time.time()-start:.2f}s")
with timer():
sum(range(1000000))
# 忽略指定异常
with suppress(FileNotFoundError):
open('nonexist.txt')
# 重定向输出
f = io.StringIO()
with redirect_stdout(f):
print("Hello")
print(f"捕获的输出: {f.getvalue()}")
16.5 日期时间处理
16.5.1 datetime深度使用
from datetime import datetime, timedelta, timezone
# 时区处理
utc_now = datetime.now(timezone.utc)
beijing_time = utc_now.astimezone(timezone(timedelta(hours=8)))
# 时间运算
next_week = datetime.now() + timedelta(weeks=1)
# 精确计时
start = datetime.now()
# 执行操作...
elapsed = datetime.now() - start
16.5.2 日历操作
import calendar
# 生成月历
cal = calendar.HTMLCalendar()
html_cal = cal.formatmonth(2023, 7)
# 日期计算
print(calendar.monthrange(2023, 2)) # (2, 28) 周起始和天数
16.6 正则表达式
16.6.1 re模块高级用法
import re
# 命名捕获组
text = "Date: 2023-07-20"
pattern = r"Date: (?P<year>\d{4})-(?P<month>\d{2})-(?P<day>\d{2})"
match = re.search(pattern, text)
if match:
print(match.groupdict()) # {'year': '2023', 'month': '07', 'day': '20'}
# 编译正则提高性能
email_re = re.compile(r'[\w\.-]+@[\w\.-]+')
emails = email_re.findall("Contact: a@b.com, x@y.org")
16.7 应用举例
案例1:日志分析
from collections import Counter
import re
from datetime import datetime
def analyze_logs(log_file):
"""分析日志文件"""
ip_pattern = re.compile(r'(\d+\.\d+\.\d+\.\d+)')
date_pattern = re.compile(r'\[(\d{2}/\w{3}/\d{4})')
status_pattern = re.compile(r'HTTP/\d\.\d" (\d{3})')
ip_counter = Counter()
status_counter = Counter()
daily_requests = {}
with open(log_file) as f:
for line in f:
# 提取IP
ip_match = ip_pattern.search(line)
if ip_match:
ip_counter[ip_match.group(1)] += 1
# 提取日期
date_match = date_pattern.search(line)
if date_match:
date_str = date_match.group(1)
date = datetime.strptime(date_str, '%d/%b/%Y').date()
daily_requests[date] = daily_requests.get(date, 0) + 1
# 提取状态码
status_match = status_pattern.search(line)
if status_match:
status_counter[status_match.group(1)] += 1
return {
'top_ips': ip_counter.most_common(5),
'status_codes': dict(status_counter),
'daily_requests': sorted(daily_requests.items())
}
案例2:数据批处理
from itertools import islice, chain
from functools import partial
import csv
def batch_processor(data, process_func, batch_size=100):
"""批量数据处理器"""
iterator = iter(data)
while True:
batch = list(islice(iterator, batch_size))
if not batch:
break
yield process_func(batch)
def process_users(users):
"""用户数据处理函数"""
return [{'name': u['name'].upper(), 'age': int(u['age'])+1} for u in users]
def read_csv(file_path):
"""CSV文件读取器"""
with open(file_path) as f:
reader = csv.DictReader(f)
yield from reader
# 构建处理管道
data_pipeline = chain(
read_csv('users.csv'),
partial(batch_processor, process_func=process_users),
lambda batches: (user for batch in batches for user in batch)
)
# 执行处理
for user in data_pipeline:
print(user)
16.8 知识图谱
持续更新Python编程学习日志与技巧,敬请关注!
#编程# #学习# #在头条记录我的2025# #python#