Python操作Kubernetes集群完全指南

liftword5个月前 (12-08)技术文章39

Python操作Kubernetes集群完全指南


目录

  1. 基础环境准备
  2. Python Kubernetes客户端介绍
  3. 连接Kubernetes集群
  4. Pod操作实战
  5. Deployment管理
  6. Service资源操作
  7. ConfigMap和Secret管理
  8. 自定义资源定义(CRD)操作
  9. 事件监听和Watch操作
  10. 高级应用场景

基础环境准备

1. 安装必要的包

首先,我们需要安装Python的Kubernetes客户端库:

pip install kubernetes
pip install openshift # 可选,用于OpenShift集群

2. 配置文件准备

import os
from kubernetes import client, config

# 加载kubeconfig配置
config.load_kube_config()

Python Kubernetes客户端介绍

1. 主要模块说明

from kubernetes import client, config, watch
from kubernetes.client import ApiClient
from kubernetes.client.rest import ApiException

主要模块功能:

  • client: 提供各种API操作接口
  • config: 处理配置文件加载
  • watch: 用于监控资源变化
  • ApiClient: 底层API客户端
  • ApiException: 异常处理

连接Kubernetes集群

示例1:基础连接配置

from kubernetes import client, config

def connect_kubernetes():
    try:
        # 加载本地kubeconfig
        config.load_kube_config()
        
        # 创建API客户端
        v1 = client.CoreV1Api()
        
        # 测试连接
        ret = v1.list_pod_for_all_namespaces(limit=1)
        print("连接成功!发现 {} 个Pod".format(len(ret.items)))
        return v1
    except Exception as e:
        print(f"连接失败:{str(e)}")
        return None

# 测试连接
api = connect_kubernetes()

示例2:多集群配置

def connect_multiple_clusters():
    clusters = {
        'prod': '/path/to/prod-kubeconfig',
        'dev': '/path/to/dev-kubeconfig'
    }
    
    apis = {}
    for cluster_name, config_file in clusters.items():
        try:
            config.load_kube_config(config_file=config_file)
            apis[cluster_name] = client.CoreV1Api()
            print(f"成功连接到{cluster_name}集群")
        except Exception as e:
            print(f"连接{cluster_name}集群失败:{str(e)}")
    
    return apis

Pod操作实战

示例3:创建Pod

from kubernetes import client, config

def create_pod(name, image, namespace="default"):
    # 创建Pod对象
    pod = client.V1Pod(
        metadata=client.V1ObjectMeta(name=name),
        spec=client.V1PodSpec(
            containers=[
                client.V1Container(
                    name=name,
                    image=image,
                    ports=[client.V1ContainerPort(container_port=80)]
                )
            ]
        )
    )
    
    # 获取API实例
    v1 = client.CoreV1Api()
    
    try:
        # 创建Pod
        api_response = v1.create_namespaced_pod(
            namespace=namespace,
            body=pod
        )
        print(f"Pod {name} 创建成功")
        return api_response
    except ApiException as e:
        print(f"Pod创建失败:{str(e)}")
        return None

# 使用示例
create_pod("nginx-pod", "nginx:latest")

示例4:查询Pod状态

def get_pod_status(name, namespace="default"):
    v1 = client.CoreV1Api()
    try:
        pod = v1.read_namespaced_pod(name=name, namespace=namespace)
        return {
            "name": pod.metadata.name,
            "status": pod.status.phase,
            "pod_ip": pod.status.pod_ip,
            "host_ip": pod.status.host_ip,
            "start_time": pod.status.start_time,
            "conditions": [
                {
                    "type": condition.type,
                    "status": condition.status
                }
                for condition in pod.status.conditions or []
            ]
        }
    except ApiException as e:
        print(f"获取Pod状态失败:{str(e)}")
        return None

# 使用示例
status = get_pod_status("nginx-pod")
print(status)

Deployment管理

示例5:创建Deployment

def create_deployment(name, image, replicas=3, namespace="default"):
    # 创建Deployment对象
    deployment = client.V1Deployment(
        metadata=client.V1ObjectMeta(name=name),
        spec=client.V1DeploymentSpec(
            replicas=replicas,
            selector=client.V1LabelSelector(
                match_labels={"app": name}
            ),
            template=client.V1PodTemplateSpec(
                metadata=client.V1ObjectMeta(
                    labels={"app": name}
                ),
                spec=client.V1PodSpec(
                    containers=[
                        client.V1Container(
                            name=name,
                            image=image,
                            ports=[client.V1ContainerPort(container_port=80)]
                        )
                    ]
                )
            )
        )
    )
    
    # 获取API实例
    apps_v1 = client.AppsV1Api()
    
    try:
        # 创建Deployment
        api_response = apps_v1.create_namespaced_deployment(
            namespace=namespace,
            body=deployment
        )
        print(f"Deployment {name} 创建成功")
        return api_response
    except ApiException as e:
        print(f"Deployment创建失败:{str(e)}")
        return None

# 使用示例
create_deployment("nginx-deployment", "nginx:latest")

示例6:更新Deployment

def update_deployment(name, new_image, namespace="default"):
    apps_v1 = client.AppsV1Api()
    
    try:
        # 获取现有deployment
        deployment = apps_v1.read_namespaced_deployment(name, namespace)
        
        # 更新镜像
        deployment.spec.template.spec.containers[0].image = new_image
        
        # 应用更新
        api_response = apps_v1.patch_namespaced_deployment(
            name=name,
            namespace=namespace,
            body=deployment
        )
        print(f"Deployment {name} 更新成功")
        return api_response
    except ApiException as e:
        print(f"Deployment更新失败:{str(e)}")
        return None

# 使用示例
update_deployment("nginx-deployment", "nginx:1.19")

Service资源操作

示例7:创建Service

def create_service(name, selector, port, target_port, namespace="default"):
    # 创建Service对象
    service = client.V1Service(
        metadata=client.V1ObjectMeta(name=name),
        spec=client.V1ServiceSpec(
            selector=selector,
            ports=[client.V1ServicePort(
                port=port,
                target_port=target_port
            )]
        )
    )
    
    v1 = client.CoreV1Api()
    
    try:
        # 创建Service
        api_response = v1.create_namespaced_service(
            namespace=namespace,
            body=service
        )
        print(f"Service {name} 创建成功")
        return api_response
    except ApiException as e:
        print(f"Service创建失败:{str(e)}")
        return None

# 使用示例
create_service(
    "nginx-service",
    {"app": "nginx-deployment"},
    80,
    80
)

ConfigMap和Secret管理

示例8:创建ConfigMap

def create_configmap(name, data, namespace="default"):
    # 创建ConfigMap对象
    configmap = client.V1ConfigMap(
        metadata=client.V1ObjectMeta(name=name),
        data=data
    )
    
    v1 = client.CoreV1Api()
    
    try:
        # 创建ConfigMap
        api_response = v1.create_namespaced_config_map(
            namespace=namespace,
            body=configmap
        )
        print(f"ConfigMap {name} 创建成功")
        return api_response
    except ApiException as e:
        print(f"ConfigMap创建失败:{str(e)}")
        return None

# 使用示例
config_data = {
    "app.properties": """
    app.name=myapp
    app.env=production
    """
}
create_configmap("app-config", config_data)

示例9:创建Secret

import base64

def create_secret(name, data, namespace="default"):
    # 编码数据
    encoded_data = {
        k: base64.b64encode(v.encode()).decode()
        for k, v in data.items()
    }
    
    # 创建Secret对象
    secret = client.V1Secret(
        metadata=client.V1ObjectMeta(name=name),
        type="Opaque",
        data=encoded_data
    )
    
    v1 = client.CoreV1Api()
    
    try:
        # 创建Secret
        api_response = v1.create_namespaced_secret(
            namespace=namespace,
            body=secret
        )
        print(f"Secret {name} 创建成功")
        return api_response
    except ApiException as e:
        print(f"Secret创建失败:{str(e)}")
        return None

# 使用示例
secret_data = {
    "username": "admin",
    "password": "secret123"
}
create_secret("app-secrets", secret_data)

自定义资源定义(CRD)操作

示例10:操作CRD资源

def create_custom_resource(group, version, plural, namespace, body):
    # 获取CustomObjectsApi
    custom_api = client.CustomObjectsApi()
    
    try:
        # 创建自定义资源
        api_response = custom_api.create_namespaced_custom_object(
            group=group,
            version=version,
            namespace=namespace,
            plural=plural,
            body=body
        )
        print(f"自定义资源创建成功")
        return api_response
    except ApiException as e:
        print(f"自定义资源创建失败:{str(e)}")
        return None

# 使用示例
custom_resource = {
    "apiVersion": "stable.example.com/v1",
    "kind": "CronTab",
    "metadata": {
        "name": "my-crontab"
    },
    "spec": {
        "cronSpec": "* * * * */5",
        "image": "my-cron-image"
    }
}

create_custom_resource(
    group="stable.example.com",
    version="v1",
    plural="crontabs",
    namespace="default",
    body=custom_resource
)

事件监听和Watch操作

示例11:监听Pod事件

from kubernetes import watch

def watch_pods(namespace="default"):
    v1 = client.CoreV1Api()
    w = watch.Watch()
    
    try:
        for event in w.stream(v1.list_namespaced_pod, namespace=namespace):
            pod = event['object']
            event_type = event['type']
            
            print(f"事件类型: {event_type}")
            print(f"Pod名称: {pod.metadata.name}")
            print(f"Pod状态: {pod.status.phase}")
            print("-------------------")
            
    except ApiException as e:
        print(f"监听失败:{str(e)}")
    except KeyboardInterrupt:
        w.stop()
        print("监听已停止")

# 使用示例
# watch_pods()  # 此函数会持续运行直到被中断

高级应用场景

示例12:批量操作和错误处理

def batch_create_resources(resources):
    results = {
        'success': [],
        'failed': []
    }
    
    for resource in resources:
        try:
            if resource['kind'] == 'Deployment':
                apps_v1 = client.AppsV1Api()
                response = apps_v1.create_namespaced_deployment(
                    namespace=resource['namespace'],
                    body=resource['spec']
                )
                results['success'].append({
                    'kind': 'Deployment',
                    'name': resource['spec'].metadata.name
                })
            elif resource['kind'] == 'Service':
                v1 = client.CoreV1Api()
                response = v1.create_namespaced_service(
                    namespace=resource['namespace'],
                    body=resource['spec']
                )
                results['success'].append({
                    'kind': 'Service',
                    'name': resource['spec'].metadata.name
                })
        except ApiException as e:
            results['failed'].append({
                'kind': resource['kind'],
                'name': resource['spec'].metadata.name,
                'error': str(e)
            })
    
    return results

# 使用示例
resources = [
    {
        'kind': 'Deployment',
        'namespace': 'default',
        'spec': client.V1Deployment(
            metadata=client.V1ObjectMeta(name="nginx-deployment"),
            spec=client.V1DeploymentSpec(
                replicas=3,
                selector=client.V1LabelSelector(
                    match_labels={"app": "nginx"}
                ),
                template=client.V1PodTemplateSpec(
                    metadata=client.V1ObjectMeta(
                        labels={"app": "nginx"}
                    ),
                    spec=client.V1PodSpec(
                        containers=[
                            client.V1Container(
                                name="nginx",
                                image="nginx:latest"
                            )
                        ]
                    )
                )
            )
        )
	}
]
		
### 示例13:资源清理和垃圾回收

```python
def cleanup_resources(namespace="default", label_selector=None):
    """
    清理指定命名空间下的资源
    """
    v1 = client.CoreV1Api()
    apps_v1 = client.AppsV1Api()
    
    cleanup_results = {
        'pods': [],
        'deployments': [],
        'services': [],
        'errors': []
    }
    
    try:
        # 删除Pod
        pods = v1.list_namespaced_pod(
            namespace=namespace,
            label_selector=label_selector
        )
        for pod in pods.items:
            try:
                v1.delete_namespaced_pod(
                    name=pod.metadata.name,
                    namespace=namespace
                )
                cleanup_results['pods'].append(pod.metadata.name)
            except ApiException as e:
                cleanup_results['errors'].append(f"Pod {pod.metadata.name}: {str(e)}")
        
        # 删除Deployment
        deployments = apps_v1.list_namespaced_deployment(
            namespace=namespace,
            label_selector=label_selector
        )
        for deployment in deployments.items:
            try:
                apps_v1.delete_namespaced_deployment(
                    name=deployment.metadata.name,
                    namespace=namespace
                )
                cleanup_results['deployments'].append(deployment.metadata.name)
            except ApiException as e:
                cleanup_results['errors'].append(f"Deployment {deployment.metadata.name}: {str(e)}")
        
        # 删除Service
        services = v1.list_namespaced_service(
            namespace=namespace,
            label_selector=label_selector
        )
        for service in services.items:
            try:
                v1.delete_namespaced_service(
                    name=service.metadata.name,
                    namespace=namespace
                )
                cleanup_results['services'].append(service.metadata.name)
            except ApiException as e:
                cleanup_results['errors'].append(f"Service {service.metadata.name}: {str(e)}")
                
        return cleanup_results
    
    except ApiException as e:
        print(f"清理资源时发生错误:{str(e)}")
        return None

# 使用示例
cleanup_result = cleanup_resources(namespace="default", label_selector="app=nginx")
print("清理结果:", cleanup_result)

示例14:资源健康检查和自动修复

import time
from typing import Dict, List

class ResourceHealthChecker:
    def __init__(self, namespace: str = "default"):
        self.namespace = namespace
        self.v1 = client.CoreV1Api()
        self.apps_v1 = client.AppsV1Api()
        
    def check_pod_health(self) -> Dict[str, List[str]]:
        """
        检查Pod的健康状态
        """
        unhealthy_pods = []
        pending_pods = []
        
        try:
            pods = self.v1.list_namespaced_pod(namespace=self.namespace)
            
            for pod in pods.items:
                if pod.status.phase == 'Failed':
                    unhealthy_pods.append(pod.metadata.name)
                elif pod.status.phase == 'Pending':
                    pending_pods.append(pod.metadata.name)
            
            return {
                'unhealthy': unhealthy_pods,
                'pending': pending_pods
            }
        
        except ApiException as e:
            print(f"检查Pod健康状态时发生错误:{str(e)}")
            return None
    
    def check_deployment_health(self) -> Dict[str, List[str]]:
        """
        检查Deployment的健康状态
        """
        unhealthy_deployments = []
        
        try:
            deployments = self.apps_v1.list_namespaced_deployment(namespace=self.namespace)
            
            for deployment in deployments.items:
                if deployment.status.ready_replicas != deployment.status.replicas:
                    unhealthy_deployments.append(deployment.metadata.name)
            
            return {
                'unhealthy': unhealthy_deployments
            }
        
        except ApiException as e:
            print(f"检查Deployment健康状态时发生错误:{str(e)}")
            return None
    
    def auto_repair(self):
        """
        自动修复不健康的资源
        """
        repair_actions = []
        
        # 检查并修复Pod
        pod_health = self.check_pod_health()
        if pod_health:
            for unhealthy_pod in pod_health['unhealthy']:
                try:
                    self.v1.delete_namespaced_pod(
                        name=unhealthy_pod,
                        namespace=self.namespace
                    )
                    repair_actions.append(f"删除不健康的Pod: {unhealthy_pod}")
                except ApiException as e:
                    repair_actions.append(f"修复Pod {unhealthy_pod} 失败: {str(e)}")
        
        # 检查并修复Deployment
        deployment_health = self.check_deployment_health()
        if deployment_health:
            for unhealthy_deployment in deployment_health['unhealthy']:
                try:
                    # 重启Deployment
                    patch = {
                        "spec": {
                            "template": {
                                "metadata": {
                                    "annotations": {
                                        "kubectl.kubernetes.io/restartedAt": datetime.now().isoformat()
                                    }
                                }
                            }
                        }
                    }
                    self.apps_v1.patch_namespaced_deployment(
                        name=unhealthy_deployment,
                        namespace=self.namespace,
                        body=patch
                    )
                    repair_actions.append(f"重启Deployment: {unhealthy_deployment}")
                except ApiException as e:
                    repair_actions.append(f"修复Deployment {unhealthy_deployment} 失败: {str(e)}")
        
        return repair_actions

# 使用示例
health_checker = ResourceHealthChecker("default")
repair_results = health_checker.auto_repair()
print("修复操作:", repair_results)

示例15:自定义控制器实现

from kubernetes import watch
import threading
import queue

class CustomController:
    def __init__(self, namespace="default"):
        self.namespace = namespace
        self.v1 = client.CoreV1Api()
        self.apps_v1 = client.AppsV1Api()
        self.event_queue = queue.Queue()
        self.running = False
    
    def start(self):
        """
        启动控制器
        """
        self.running = True
        
        # 启动事件处理线程
        threading.Thread(target=self._process_events).start()
        
        # 启动资源监控
        threading.Thread(target=self._watch_pods).start()
        threading.Thread(target=self._watch_deployments).start()
    
    def stop(self):
        """
        停止控制器
        """
        self.running = False
    
    def _watch_pods(self):
        """
        监控Pod变化
        """
        w = watch.Watch()
        while self.running:
            try:
                for event in w.stream(
                    self.v1.list_namespaced_pod,
                    namespace=self.namespace
                ):
                    if not self.running:
                        break
                    self.event_queue.put(('Pod', event))
            except Exception as e:
                print(f"Pod监控异常:{str(e)}")
                if self.running:
                    time.sleep(5)  # 发生错误时等待后重试
    
    def _watch_deployments(self):
        """
        监控Deployment变化
        """
        w = watch.Watch()
        while self.running:
            try:
                for event in w.stream(
                    self.apps_v1.list_namespaced_deployment,
                    namespace=self.namespace
                ):
                    if not self.running:
                        break
                    self.event_queue.put(('Deployment', event))
            except Exception as e:
                print(f"Deployment监控异常:{str(e)}")
                if self.running:
                    time.sleep(5)
    
    def _process_events(self):
        """
        处理事件队列
        """
        while self.running:
            try:
                resource_type, event = self.event_queue.get(timeout=1)
                self._handle_event(resource_type, event)
            except queue.Empty:
                continue
            except Exception as e:
                print(f"事件处理异常:{str(e)}")
    
    def _handle_event(self, resource_type, event):
        """
        处理具体事件
        """
        event_type = event['type']
        obj = event['object']
        
        print(f"收到{resource_type}事件:")
        print(f"  类型: {event_type}")
        print(f"  名称: {obj.metadata.name}")
        
        if resource_type == 'Pod':
            self._handle_pod_event(event_type, obj)
        elif resource_type == 'Deployment':
            self._handle_deployment_event(event_type, obj)
    
    def _handle_pod_event(self, event_type, pod):
        """
        处理Pod事件
        """
        if event_type == 'MODIFIED':
            if pod.status.phase == 'Failed':
                print(f"检测到Pod {pod.metadata.name} 失败,尝试重启")
                try:
                    self.v1.delete_namespaced_pod(
                        name=pod.metadata.name,
                        namespace=self.namespace
                    )
                except ApiException as e:
                    print(f"重启Pod失败:{str(e)}")
    
    def _handle_deployment_event(self, event_type, deployment):
        """
        处理Deployment事件
        """
        if event_type == 'MODIFIED':
            if deployment.status.ready_replicas != deployment.status.replicas:
                print(f"检测到Deployment {deployment.metadata.name} 副本不一致")
                # 这里可以添加自定义的处理逻辑

# 使用示例
controller = CustomController("default")
controller.start()

# 运行一段时间后停止
# time.sleep(3600)
# controller.stop()

示例16:资源指标监控

from kubernetes.client import CustomObjectsApi
import time

class MetricsCollector:
    def __init__(self):
        self.custom_api = CustomObjectsApi()
    
    def get_node_metrics(self):
        """
        获取节点资源使用指标
        """
        try:
            metrics = self.custom_api.list_cluster_custom_object(
                group="metrics.k8s.io",
                version="v1beta1",
                plural="nodes"
            )
            
            node_metrics = {}
            for item in metrics['items']:
                node_name = item['metadata']['name']
                node_metrics[node_name] = {
                    'cpu': item['usage']['cpu'],
                    'memory': item['usage']['memory']
                }
            
            return node_metrics
        
        except ApiException as e:
            print(f"获取节点指标失败:{str(e)}")
            return None
    
    def get_pod_metrics(self, namespace="default"):
        """
        获取Pod资源使用指标
        """
        try:
            metrics = self.custom_api.list_namespaced_custom_object(
                group="metrics.k8s.io",
                version="v1beta1",
                namespace=namespace,
                plural="pods"
            )
            
            pod_metrics = {}
            for item in metrics['items']:
                pod_name = item['metadata']['name']
                containers = {}
                
                for container in item['containers']:
                    containers[container['name']] = {
                        'cpu': container['usage']['cpu'],
                        'memory': container['usage']['memory']
                    }
                
                pod_metrics[pod_name] = containers
            
            return pod_metrics
        
        except ApiException as e:
            print(f"获取Pod指标失败:{str(e)}")
            return None
    
    def monitor_resources(self, interval=30):
        """
        持续监控资源使用情况
        """
        while True:
            print("\n=== 资源使用情况 ===")
            
            # 获取节点指标
            node_metrics = self.get_node_metrics()
            if node_metrics:
                print("\n节点资源使用情况:")
                for node_name, metrics in node_metrics.items():
                    print(f"\n节点: {node_name}")
                    print(f"CPU使用: {metrics['cpu']}")
                    print(f"内存使用: {metrics['memory']}")
            
            # 获取Pod指标
            pod_metrics = self.get_pod_metrics()
            if pod_metrics:
                print("\nPod资源使用情况:")
                for pod_name, containers in pod_metrics.items():
                    print(f"\nPod: {pod_name}")
                    for container_name, metrics in containers.items():
                        print(f"容器: {container_name}")
                        print(f"CPU使用: {metrics['cpu']}")
                        print(f"内存使用: {metrics['memory']}")
            
            time.sleep(interval)

# 使用示例
collector = MetricsCollector()
# collector.monitor_resources()  # 持续监控

最佳实践和注意事项

  1. 错误处理
  • 始终使用try-except块处理API调用
  • 实现重试机制处理临时性故障
  • 记录详细的错误信息便于调试
  1. 性能优化
  • 使用批量操作代替单个操作
  • 实现合适的缓存机制
  • 避免频繁的API调用
  1. 安全考虑
  • 使用最小权限原则
  • 保护敏感信息(如密钥和证书)
  • 实现适当的认证和授权机制
  1. 可维护性
  • 模块化代码结构
  • 完善的日志记录
  • 清晰的代码注释

总结

本文详细介绍了如何使用Python操作Kubernetes集群,包括:

  1. 基础环境配置
  2. 常见资源操作
  3. 高级应用场景
  4. 自动化运维实践
  5. 监控和告警实现

通过这些示例和最佳实践,可以构建强大的Kubernetes自动化工具和运维系统。

本文使用 文章同步助手 同步