kubernetes 开发之 client-go 学习

3月份已经记录了一些开发与 kubernetes 交互的文章,由于工期比较紧,虽然开发完成了,对client-go的学习还是不彻底。这一篇我记录一些我在 kubernetes 开发中的备忘。

访问 k8s 集群获取资源的几种方式:

  1. 命令行 kubectl

  2. http k8s REST API

  3. 代码库 client-go

    1. ClientSet

      ClientSet 可以对k8s的标准资源进行CURD

    2. Dynamic Client

      它能处理 kubernetes 所有的资源,控制所有的 API,处理CRD一般都是用它。请求返回的对象 unstructured.Unstructured 是一个 map[string]interface{}

    3. DiscoveryClient

      这是一种发现客户端,在前面的客户端中需要知道资源的 Resource 和 Version 才能找到你想要的,

      这些信息太多很难全部记住,这个客户端用于获取资源组、版本等信息。

    4. RESTClient

      最基础的客户端,对HTTP Request进行了封装,实现了RESTFul风格的API。

    5. informer

其中,ClientSet、DynamicClient、DiscoveryClient 都是基于 RESTClient 封装的。

一、背景

Kubernetes 官方从2016年8月份开始,将Kubernetes资源操作相关的核心源码抽取出来,独立出来一个项目Client-go,作为官方提供的Go client。

Kubernetes的 部分代码也是基于这个client实现的。

引入包:

go1.16+:

go get k8s.io/client-go@latest

go1.11+ 使用特定版本:

go get k8s.io/client-go@v0.20.4

更多内容参考官方安装手册

二、构造k8s配置文件,生成clientset和dynamic client

无论是使用 ClientSet 还是 Dynamic Client,第一步都是构建 k8s 的配置文件。

通过参数(master的url或者kubeconfig路径)和BuildConfigFromFlags方法来获取rest.Config对象。

kubeconfig路径一般路径为$HOME/.kube/config,用来配置本地连接的kubernetes集群。

package main

import (
    "fmt"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/dynamic"    
    "log"
    "os"
    "path/filepath"
)

// 从文件 ~/.kube/config 中构造config
func getConfigFromFile() (*restclient.Config, error){
    kubeconfig := filepath.Join(os.Getenv("HOME"), ".kube", "config")
    config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
    if err != nil {
        log.Fatal(err)
    }
    
    return config, err
}

const (
	// High enough QPS to fit all expected use cases.
	DefaultQPS = 1e6
	// High enough Burst to fit all expected use cases.
	DefaultBurst = 1e6

)

// 从数据库存储的多集群信息,包括masterIP,kubeconfig和元数据信息中初始化,参考了[k8s多集群go-client实现](https://blog.csdn.net/weixin_43908373/article/details/95618989)

func InitClient(clusterName string) (*kubernetes.Clientset, *rest.Config, dynamic.Interface, error) {
	//数据库取出集群信息
	master, kubeconfig, err := GetClusterInfo(clusterName)
	if err != nil {
		logs.Error("get db for cluster kubeconfig error. %v ", err)
		return nil, nil, err
	}
	kubeconfigJson, err := yaml.YAMLToJSON([]byte(kubeconfig))
	if err != nil {
		logs.Error("yaml to json err")
	}
	configV1 := clientcmdapiv1.Config{}
	err = json.Unmarshal(kubeconfigJson, &configV1)
	if err != nil {
		logs.Error("json unmarshal kubeconfig error. %v ", err)
		return nil, nil, err
	}
	// 切换匹配的版本
	configObject, err := clientcmdlatest.Scheme.ConvertToVersion(&configV1, clientcmdapi.SchemeGroupVersion)
	if err != nil {
		logs.Error("ConvertToVersion error. %v ", err)
		return nil, nil, err
	}
	configInternal := configObject.(*clientcmdapi.Config)

	// 实例化配置信息
	clientConfig, err := clientcmd.NewDefaultClientConfig(*configInternal, &clientcmd.ConfigOverrides{
		ClusterDefaults: clientcmdapi.Cluster{Server: master},
	}).ClientConfig()

	if err != nil {
		logs.Error("build client config error. %v ", err)
		return nil, nil, err
	}
	clientConfig.QPS = defaultQPS
	clientConfig.Burst = defaultBurst
	
	// 实例化客户端
	clientSet, err := kubernetes.NewForConfig(clientConfig)

	if err != nil {
		logs.Error("(%s) kubernetes.NewForConfig(%v) error.%v", master, err, clientConfig)
		return nil, nil, err
	}
	
	dynamicClient, err := dynamic.NewForConfig(config)
	if err != nil {
		logrus.Error(err)
		return nil, nil, err
	}

	return clientSet, clientConfig, dynamicClient, nil
}

三、clientSet

Clientset 是调用 Kubernetes 资源对象最常用的 client,可以操作所有的资源对象。需要指定Group、Version,然后根据 Resource 获取对应的 XXInterface。

通过 *rest.Config 参数和 NewForConfig 方法来获取 clientset 对象,clientset 是多个client的集合,每个client可能包含不同版本的方法调用。

config, err := getConfig()

clientset, err := kubernetes.NewForConfig(config)
// 获取node 列表 
nodes, err := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
// 更新
_, err = a.client.CoreV1().Nodes().Update(context.TODO(), newNode, metav1.UpdateOptions{})
// 发送patch 指令更新
patchTemplate := map[string]interface{}{
		"metadata": map[string]interface{}{
			"labels": map[string]interface{}{
				labelkey: labelvaule,
			},
		},
	}
patchdata, _ := json.Marshal(patchTemplate)
_, err := clientset.CoreV1().Nodes().Patch(ctx, Nodes[i].Name, types.StrategicMergePatchType, patchdata, metav1.PatchOptions{})

ns 创建

nsSpec := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ns}}
_, err := clientset.Core().Namespaces().Create(nsSpec)

secrets 创建

    data := make(map[string][]byte)
    data["user"] = []byte("admin")
    data["password"] = []byte("password")

    // https://godoc.org/k8s.io/api/core/v1
    // https://godoc.org/k8s.io/client-go/kubernetes/typed/core/v1#SecretInterface
    secrets, err := clientset.CoreV1().Secrets("default").Create(&v1.Secret{
            TypeMeta: metav1.TypeMeta{
                    Kind: "Secret",
            },
            ObjectMeta: metav1.ObjectMeta{
                    Name: "generic-secret",
                    Namespace: "default",
            },
            Data: data,
    })

其它

pods, err := clientset.CoreV1().Pods("").List(metav1.ListOptions{})
services, err := clientset.CoreV1().Services("").List(metav1.ListOptions{})
serviceAccounts, err := clientset.CoreV1().ServiceAccounts("").List(metav1.ListOptions{})
deployments, err := clientset.AppsV1().Deployments("").List(metav1.ListOptions{})
pvs, err := clientset.CoreV1().PersistentVolumes().List(metav1.ListOptions{})
pvcs, err := clientset.CoreV1().PersistentVolumeClaims("").List(metav1.ListOptions{})
namespaces, err := clientset.CoreV1().Namespaces().List(metav1.ListOptions{})
ingresses, err := clientset.ExtensionsV1beta1().Ingresses("").List(metav1.ListOptions{})
secrets, err := clientset.CoreV1().Secrets("").List(metav1.ListOptions{})
configMaps, err := clientset.CoreV1().ConfigMaps("").List(metav1.ListOptions{})
ingress, err := clientset.ExtensionsV1beta1().Ingresses("jx").Get("docker-registry", metav1.GetOptions{})

四、Dynamic Client

获得 dynamic client

    config, err := getConfig()
    dynamicClient, err := dynamic.NewForConfig(config)
	if err != nil {
		logrus.Error(err)
		return nil, nil, err
	}

获取资源,以kyverno为例 :

	gvr := getGVR("kyverno.io", "v1", "clusterpolicies")

	list, err := dynamicClient.Resource(gvr).List(metav1.ListOptions{})

	if err != nil {
		logrus.Error(err)
		return
	}
	

// getGVR :- gets GroupVersionResource for dynamic client
func getGVR(group, version, resource string) schema.GroupVersionResource {
	return schema.GroupVersionResource{Group: group, Version: version, Resource: resource}
}

五、DiscoveryClient

DiscoveryClient 在请求到数据之后会缓存到本地,默认存储位置是~/.kube/cache和~/.kube/http-cache,默认是每10分钟会和API Server同步一次。

package main

import (
	"fmt"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/discovery"
	"k8s.io/client-go/tools/clientcmd"
)

func main()  {
	config, err := clientcmd.BuildConfigFromFlags("","/root/.kube/config")
	if err != nil {
		panic(err.Error())
	}

	discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
	if err != nil {
		panic(err.Error())
	}

	_, APIResourceList, err := discoveryClient.ServerGroupsAndResources()
	if err != nil {
		panic(err.Error())
	}
	for _, list := range APIResourceList {
		gv, err := schema.ParseGroupVersion(list.GroupVersion)
		if err != nil {
			panic(err.Error())
		}
		for _, resource := range list.APIResources {
			fmt.Printf("name: %v, group: %v, version %v\n", resource.Name, gv.Group, gv.Version)
		}
	}
}

六、informer

使用 informer 的目的是为了减轻 apiserver 数据交互的压力而抽象出来的一个 cache 层。

Informer 只会调用 Kubernetes List 和 Watch 两种类型的 API。Informer 在初始化的时,先调用 Kubernetes List API 获得某种 resource 的全部 Object,缓存在内存中; 然后,调用 Watch API 去 watch 这种 resource,去维护这份缓存; 最后,Informer 就不再调用 Kubernetes 的任何 API。

// 通过informer 获取node 列表
factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
nodeInformer := factory.Core().V1().Nodes()
go nodeInformer.Informer().Run(stopCh)
if !cache.WaitForCacheSync(stopCh, nodeInformer.Informer().HasSynced) {
    runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
    return
}
nodes, err := nodeInformer.Lister().List(labels.NewSelector())

参考资料


golang 文件路径相关操作 golang 创建错误