Kubernetes CRI Plugin定制开发及应用


kubelet 调用 containerd 内置的 CRI 插件管理容器和镜像,并通过 CNI 插件给 Pod 配置网络。

CRI Plugin部分可定制化部分包括 runtime service 和 image service两部分。我们可以通过对CRI Plugin进行定制化开发,以解决部分在上层Kubernetes部分难以解决的问题。

Runtime Service

Kubernetes CRI Runtime Service Interface

// code ref: <https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/cri-api/pkg/apis/runtime/v1/api.pb.go#L11294>

// RuntimeServiceServer is the server API for RuntimeService service.
type RuntimeServiceServer interface {
	// Version returns the runtime name, runtime version, and runtime API version.
	Version(context.Context, *VersionRequest) (*VersionResponse, error)
	// RunPodSandbox creates and starts a pod-level sandbox. Runtimes must ensure
	// the sandbox is in the ready state on success.
	RunPodSandbox(context.Context, *RunPodSandboxRequest) (*RunPodSandboxResponse, error)
	// StopPodSandbox stops any running process that is part of the sandbox and
	// reclaims network resources (e.g., IP addresses) allocated to the sandbox.
	// If there are any running containers in the sandbox, they must be forcibly
	// terminated.
	// This call is idempotent, and must not return an error if all relevant
	// resources have already been reclaimed. kubelet will call StopPodSandbox
	// at least once before calling RemovePodSandbox. It will also attempt to
	// reclaim resources eagerly, as soon as a sandbox is not needed. Hence,
	// multiple StopPodSandbox calls are expected.
	StopPodSandbox(context.Context, *StopPodSandboxRequest) (*StopPodSandboxResponse, error)
	// RemovePodSandbox removes the sandbox. If there are any running containers
	// in the sandbox, they must be forcibly terminated and removed.
	// This call is idempotent, and must not return an error if the sandbox has
	// already been removed.

	/*
	see more codes in <https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/cri-api/pkg/apis/runtime/v1/api.pb.go>
	*/

	// RuntimeConfig returns configuration information of the runtime.
	// A couple of notes:
	//   - The RuntimeConfigRequest object is not to be confused with the contents of UpdateRuntimeConfigRequest.
	//     The former is for having runtime tell Kubelet what to do, the latter vice versa.
	//   - It is the expectation of the Kubelet that these fields are static for the lifecycle of the Kubelet.
	//     The Kubelet will not re-request the RuntimeConfiguration after startup, and CRI implementations should
	//     avoid updating them without a full node reboot.
	RuntimeConfig(context.Context, *RuntimeConfigRequest) (*RuntimeConfigResponse, error)
}

Containerd RuntimeServiceServer 标准实现

// code ref: <https://github.com/containerd/containerd/blob/v1.7.20/pkg/cri/server/service.go#L63>

// CRIService is the interface implement CRI remote service server.
type CRIService interface {
	runtime.RuntimeServiceServer
	runtime.ImageServiceServer
	// Closer is used by containerd to gracefully stop cri service.
	io.Closer

	Run(ready func()) error

	Register(*grpc.Server) error
}

开发CRI Plugin

kubelet配置

  1. 修改config.yaml kubelet配置如下,修改 containerRuntimeEndpoint 为你将要运行的sock地址 #cat /var/lib/kubelet/config.yaml address: 0.0.0.0 apiVersion: kubelet.config.k8s.io/v1beta1 containerRuntimeEndpoint: unix:///run/containerd/containerd.sock
  2. 修改systemd配置 通过修改systemd中kubelet启动命令修改 CRI runtime endpoint,这种方式已经过时,同时会覆盖config.yaml配置
    “`bash
    # cat /etc/systemd/system/kubelet.service.d/10-kubeadm.conf [Service] Environment="KUBELET_KUBECONFIG_ARGS=--bootstrap-kubeconfig=/etc/kubernetes/bootstrap-kubelet.conf --kubeconfig=/etc/kubernetes/kubelet.conf" Environment="KUBELET_CONFIG_ARGS=--config=/var/lib/kubelet/config.yaml" EnvironmentFile=-/var/lib/kubelet/kubeadm-flags.env Environment="KUBELET_EXTRA_ARGS= \\ --runtime-request-timeout=15m --container-runtime-endpoint=unix:///var/run/containerd/containerd.sock" ExecStart= ExecStart=/usr/bin/kubelet $KUBELET_KUBECONFIG_ARGS $KUBELET_CONFIG_ARGS $KUBELET_KUBEADM_ARGS $KUBELET_EXTRA_ARGS
    “`

实现 RuntimeServiceServer interface

下面是一个CRI Runtime interface最简单的实现,启动 Server 并初始化RuntimeServiceClient,代理所有接口到 runtimeapi.RuntimeServiceClient

package server

import (
	"context"
	"log/slog"
	"net"
	"os"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	
	runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
)

type Server struct {
	options  Options
	client   runtimeapi.RuntimeServiceClient
	server   *grpc.Server
	listener net.Listener
}

// make sure Server impl runtimeapi.RuntimeServiceServer
var _ runtimeapi.RuntimeServiceServer = (*Server)(nil)

func New(options Options) (*Server, error) {
	listener, err := net.Listen("unix", options.ShimSocket)
	if err != nil {
		return nil, err
	}
	server := grpc.NewServer()

	return &Server{
		server:   server,
		listener: listener,
	}, nil
}

func (s *Server) Start() error {
	conn, err := grpc.NewClient(s.options.CRISocket, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		return err
	}
	s.client = runtimeapi.NewRuntimeServiceClient(conn)
	runtimeapi.RegisterRuntimeServiceServer(s.server, s)

	// do serve after client is created and registered
	go func() {
		_ = s.server.Serve(s.listener)
	}()
	return waitForServer(s.options.ShimSocket, time.Second)
}

func (s *Server) Stop() {
	s.server.Stop()
	s.listener.Close()
}

func (s *Server) Version(ctx context.Context, request *runtimeapi.VersionRequest) (*runtimeapi.VersionResponse, error) {
	slog.Info("Doing version request", "request", request)
	resp, err := s.client.Version(ctx, request)
	if err != nil {
		slog.Error("failed to get version", "error", err)
		return resp, err
	}
	slog.Debug("Got version response", "response", resp)
	return resp, err
}

func (s *Server) RunPodSandbox(ctx context.Context, request *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) {
	slog.Info("Doing run pod sandbox request", "request", request)
	return s.client.RunPodSandbox(ctx, request)
}

func (s *Server) StopPodSandbox(ctx context.Context, request *runtimeapi.StopPodSandboxRequest) (*runtimeapi.StopPodSandboxResponse, error) {
	slog.Info("Doing stop pod sandbox request", "request", request)
	return s.client.StopPodSandbox(ctx, request)
}

func (s *Server) RemovePodSandbox(ctx context.Context, request *runtimeapi.RemovePodSandboxRequest) (*runtimeapi.RemovePodSandboxResponse, error) {
	slog.Info("Doing remove pod sandbox request", "request", request)
	return s.client.RemovePodSandbox(ctx, request)
}

/*
todo: add more funcs to impl RuntimeServiceServer interface
*/

func waitForServer(socket string, timeout time.Duration, opts ...interface{}) error {
	// impl this. wait until socket has been served.
}

应用

通过实现RuntimeServiceServer ,我们可以在kubelet调用创建/删除容器时,添加自己的逻辑,生产中的应用列举:

  1. 在容器创建时注入无感知注入/修改/删除容器 env,并对 k8s apiserver不可见;
  2. 在容器删除/暂停时通过容器commit操作,保存容器状态到镜像中;
  3. 限制具体容器操作,如消耗较大的exec操作等;

tips:由于能获取到的容器信息有限,对容器进行上述操作进行的筛选可以通过容器env判断,也可以通过plugin连接containerd,创建containerd client获取容器更多信息。

Image Service

Kubernetes CRI Image Service Interface

// code ref: <https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/cri-api/pkg/apis/runtime/v1/api.pb.go#L12215>

// ImageServiceServer is the server API for ImageService service.
type ImageServiceServer interface {
	// ListImages lists existing images.
	ListImages(context.Context, *ListImagesRequest) (*ListImagesResponse, error)
	// ImageStatus returns the status of the image. If the image is not
	// present, returns a response with ImageStatusResponse.Image set to
	// nil.
	ImageStatus(context.Context, *ImageStatusRequest) (*ImageStatusResponse, error)
	// PullImage pulls an image with authentication config.
	PullImage(context.Context, *PullImageRequest) (*PullImageResponse, error)
	// RemoveImage removes the image.
	// This call is idempotent, and must not return an error if the image has
	// already been removed.
	RemoveImage(context.Context, *RemoveImageRequest) (*RemoveImageResponse, error)
	// ImageFSInfo returns information of the filesystem that is used to store images.
	ImageFsInfo(context.Context, *ImageFsInfoRequest) (*ImageFsInfoResponse, error)
}

Image Service标准实现

可以发现,RuntimeServiceServer 和 ImageServiceServer 同时在这里实现,具体代码参考 https://github.com/containerd/containerd/blob/main/internal/cri/server/images/service.go#L51。

实现 ImageServiceServer interface

同实现 RuntimeServiceServer interface一样,这里不再赘述。

应用

可以发现 ImageServiceServer interface 中可以定义镜像或者修改镜像信息,那么就有很多应用场景:

  1. 修改镜像拉取地址等操作,适合用于在内网环境下,全量替换k8s镜像拉取地址,修改到内网registry或者代理registry;
  2. 或者从安全性考虑,在pod启动前,提前拉取镜像,验证镜像安全性等。


发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注